Repository: apex-malhar
Updated Branches:
  refs/heads/master d6f9ed580 -> 349be9d9d


APEXMALHAR-2502 #FixKuduOutputOperator for extensibility


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/349be9d9
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/349be9d9
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/349be9d9

Branch: refs/heads/master
Commit: 349be9d9d9c02f0e114fc313ed3daf36b9456cf3
Parents: d6f9ed5
Author: Ananth <[email protected]>
Authored: Tue Jun 6 06:26:31 2017 +1000
Committer: Ananth <[email protected]>
Committed: Tue Jun 6 06:26:31 2017 +1000

----------------------------------------------------------------------
 .../apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java  | 7 +++++--
 .../apache/apex/malhar/contrib/kudu/ApexKuduConnection.java   | 2 +-
 .../apex/malhar/contrib/kudu/BaseKuduOutputOperator.java      | 4 ++++
 .../apache/apex/malhar/contrib/kudu/KuduExecutionContext.java | 6 +++---
 4 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
index 250334b..ff668b0 100644
--- 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/AbstractKuduOutputOperator.java
@@ -114,7 +114,7 @@ public abstract class AbstractKuduOutputOperator extends 
BaseOperator
   private static final transient Logger LOG = 
LoggerFactory.getLogger(AbstractKuduOutputOperator.class);
 
   @NotNull
-  private WindowDataManager windowDataManager;
+  protected WindowDataManager windowDataManager;
 
   private transient long currentWindowId;
 
@@ -231,7 +231,10 @@ public abstract class AbstractKuduOutputOperator extends 
BaseOperator
   private void performCommonProcessing(Operation currentOperation, 
KuduExecutionContext kuduExecutionContext)
   {
     
currentOperation.setExternalConsistencyMode(kuduExecutionContext.getExternalConsistencyMode());
-    
currentOperation.setPropagatedTimestamp(kuduExecutionContext.getPropagatedTimestamp());
+    Long propagatedTimeStamp = kuduExecutionContext.getPropagatedTimestamp();
+    if ( propagatedTimeStamp != null) { // set propagation timestamp only if 
enabled
+      currentOperation.setPropagatedTimestamp(propagatedTimeStamp);
+    }
     PartialRow partialRow = currentOperation.getRow();
     Object payload = kuduExecutionContext.getPayload();
     Set<String> doNotWriteColumns = 
kuduExecutionContext.getDoNotWriteColumns();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
index aed6b8b..99eaf1b 100644
--- 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/ApexKuduConnection.java
@@ -213,7 +213,7 @@ public class ApexKuduConnection implements AutoCloseable, 
Serializable
       return this;
     }
 
-    protected ApexKuduConnection build()
+    public ApexKuduConnection build()
     {
       ApexKuduConnection apexKuduConnection = new ApexKuduConnection(this);
       return apexKuduConnection;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
index 6de7190..6c7e9a6 100644
--- 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/BaseKuduOutputOperator.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
 import org.apache.kudu.client.ExternalConsistencyMode;
 import org.apache.kudu.client.SessionConfiguration;
 
@@ -65,11 +67,13 @@ public class BaseKuduOutputOperator extends 
AbstractKuduOutputOperator
 
   public BaseKuduOutputOperator() throws IOException, ClassNotFoundException
   {
+    windowDataManager = new FSWindowDataManager();
     initConnectionBuilderProperties(DEFAULT_CONNECTION_PROPS_FILE_NAME);
   }
 
   public BaseKuduOutputOperator(String configFileInClasspath) throws 
IOException, ClassNotFoundException
   {
+    windowDataManager = new FSWindowDataManager();
     initConnectionBuilderProperties(configFileInClasspath);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/349be9d9/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
index 27d382d..a346705 100644
--- 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/kudu/KuduExecutionContext.java
@@ -50,7 +50,7 @@ public class KuduExecutionContext<T>
 
   private ExternalConsistencyMode externalConsistencyMode;
 
-  private long propagatedTimestamp;
+  private Long propagatedTimestamp;
 
   public T getPayload()
   {
@@ -82,12 +82,12 @@ public class KuduExecutionContext<T>
     this.externalConsistencyMode = externalConsistencyMode;
   }
 
-  public long getPropagatedTimestamp()
+  public Long getPropagatedTimestamp()
   {
     return propagatedTimestamp;
   }
 
-  public void setPropagatedTimestamp(long propagatedTimestamp)
+  public void setPropagatedTimestamp(Long propagatedTimestamp)
   {
     this.propagatedTimestamp = propagatedTimestamp;
   }

Reply via email to