Repository: apex-malhar Updated Branches: refs/heads/master d6f9ed580 -> 349be9d9d
APEXMALHAR-2502 #FixKuduOutputOperator for extensibility Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/349be9d9 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/349be9d9 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/349be9d9 Branch: refs/heads/master Commit: 349be9d9d9c02f0e114fc313ed3daf36b9456cf3 Parents: d6f9ed5 Author: Ananth <[email protected]> Authored: Tue Jun 6 06:26:31 2017 +1000 Committer: Ananth <[email protected]> Committed: Tue Jun 6 06:26:31 2017 +1000 ---------------------------------------------------------------------- .../apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java | 7 +++++-- .../apache/apex/malhar/contrib/kudu/ApexKuduConnection.java | 2 +- .../apex/malhar/contrib/kudu/BaseKuduOutputOperator.java | 4 ++++ .../apache/apex/malhar/contrib/kudu/KuduExecutionContext.java | 6 +++--- 4 files changed, 13 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java index 250334b..ff668b0 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java @@ -114,7 +114,7 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator private static final transient Logger LOG = LoggerFactory.getLogger(AbstractKuduOutputOperator.class); @NotNull - private WindowDataManager windowDataManager; + protected WindowDataManager windowDataManager; private transient long currentWindowId; @@ -231,7 +231,10 @@ public abstract class AbstractKuduOutputOperator extends BaseOperator private void performCommonProcessing(Operation currentOperation, KuduExecutionContext kuduExecutionContext) { currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode()); - currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp()); + Long propagatedTimeStamp = kuduExecutionContext.getPropagatedTimestamp(); + if ( propagatedTimeStamp != null) { // set propagation timestamp only if enabled + currentOperation.setPropagatedTimestamp(propagatedTimeStamp); + } PartialRow partialRow = currentOperation.getRow(); Object payload = kuduExecutionContext.getPayload(); Set<String> doNotWriteColumns = kuduExecutionContext.getDoNotWriteColumns(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java index aed6b8b..99eaf1b 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java @@ -213,7 +213,7 @@ public class ApexKuduConnection implements AutoCloseable, Serializable return this; } - protected ApexKuduConnection build() + public ApexKuduConnection build() { ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this); return apexKuduConnection; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java index 6de7190..6c7e9a6 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java @@ -24,6 +24,8 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.kudu.client.ExternalConsistencyMode; import org.apache.kudu.client.SessionConfiguration; @@ -65,11 +67,13 @@ public class BaseKuduOutputOperator extends AbstractKuduOutputOperator public BaseKuduOutputOperator() throws IOException, ClassNotFoundException { + windowDataManager = new FSWindowDataManager(); initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME); } public BaseKuduOutputOperator(String configFileInClasspath) throws IOException, ClassNotFoundException { + windowDataManager = new FSWindowDataManager(); initConnectionBuilderProperties(configFileInClasspath); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java index 27d382d..a346705 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java @@ -50,7 +50,7 @@ public class KuduExecutionContext<T> private ExternalConsistencyMode externalConsistencyMode; - private long propagatedTimestamp; + private Long propagatedTimestamp; public T getPayload() { @@ -82,12 +82,12 @@ public class KuduExecutionContext<T> this.externalConsistencyMode = externalConsistencyMode; } - public long getPropagatedTimestamp() + public Long getPropagatedTimestamp() { return propagatedTimestamp; } - public void setPropagatedTimestamp(long propagatedTimestamp) + public void setPropagatedTimestamp(Long propagatedTimestamp) { this.propagatedTimestamp = propagatedTimestamp; }
