This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d908aa  [GOBBLIN-874] Make WorkUnitPacker and SizeEstimator pluggable
0d908aa is described below

commit 0d908aa79be7cabb805458c1c698f241e4ccf519
Author: autumnust <[email protected]>
AuthorDate: Mon Sep 9 14:31:58 2019 -0700

    [GOBBLIN-874] Make WorkUnitPacker and SizeEstimator pluggable
    
    Closes #2728 from autumnust/kafkasource
---
 ...fkaAvgRecordSizeBasedWorkUnitSizeEstimator.java |  2 +-
 ...fkaAvgRecordTimeBasedWorkUnitSizeEstimator.java |  2 +-
 .../packer/KafkaBiLevelWorkUnitPacker.java         |  4 +-
 .../packer/KafkaSingleLevelWorkUnitPacker.java     |  2 +-
 .../kafka/workunit/packer/KafkaWorkUnitPacker.java | 32 +++++++--
 .../workunit/packer/KafkaWorkUnitPackerTest.java   | 79 ++++++++++++++++++++++
 6 files changed, 111 insertions(+), 10 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
index f80c7c4..a687d93 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
@@ -53,7 +53,7 @@ public class KafkaAvgRecordSizeBasedWorkUnitSizeEstimator 
implements KafkaWorkUn
 
   private final Map<KafkaPartition, Long> estAvgSizes = Maps.newHashMap();
 
-  KafkaAvgRecordSizeBasedWorkUnitSizeEstimator(SourceState state) {
+  public KafkaAvgRecordSizeBasedWorkUnitSizeEstimator(SourceState state) {
     readPreAvgRecordSizes(state);
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
index e0539a5..3346fcc 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
@@ -63,7 +63,7 @@ public class KafkaAvgRecordTimeBasedWorkUnitSizeEstimator 
implements KafkaWorkUn
   private final Map<String, Double> estAvgMillis = Maps.newHashMap();
   private double avgEstAvgMillis = 0.0;
 
-  KafkaAvgRecordTimeBasedWorkUnitSizeEstimator(SourceState state) {
+  public KafkaAvgRecordTimeBasedWorkUnitSizeEstimator(SourceState state) {
     readPrevAvgRecordMillis(state);
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index 9c134b8..f85fd97 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -58,7 +58,7 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
   public static final String WORKUNIT_PRE_GROUPING_SIZE_FACTOR = 
"workunit.pre.grouping.size.factor";
   public static final double DEFAULT_WORKUNIT_PRE_GROUPING_SIZE_FACTOR = 3.0;
 
-  protected KafkaBiLevelWorkUnitPacker(AbstractSource<?, ?> source, 
SourceState state) {
+  public KafkaBiLevelWorkUnitPacker(AbstractSource<?, ?> source, SourceState 
state) {
     super(source, state);
   }
 
@@ -103,7 +103,7 @@ public class KafkaBiLevelWorkUnitPacker extends 
KafkaWorkUnitPacker {
    * Group {@link WorkUnit}s into groups. Each group is a {@link 
MultiWorkUnit}. Each group has a capacity of
    * avgGroupSize. If there's a single {@link WorkUnit} whose size is larger 
than avgGroupSize, it forms a group itself.
    */
-  private static List<MultiWorkUnit> 
bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {
+  static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit> 
workUnits, double avgGroupSize) {
 
     // Sort workunits by data size desc
     Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
index b731e96..8c71a6a 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
@@ -43,7 +43,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
  */
 public class KafkaSingleLevelWorkUnitPacker extends KafkaWorkUnitPacker {
 
-  protected KafkaSingleLevelWorkUnitPacker(AbstractSource<?, ?> source, 
SourceState state) {
+  public KafkaSingleLevelWorkUnitPacker(AbstractSource<?, ?> source, 
SourceState state) {
     super(source, state);
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index a8dfae4..4630d6c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,7 +44,6 @@ import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
 import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
-import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 
@@ -58,20 +58,33 @@ public abstract class KafkaWorkUnitPacker {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaWorkUnitPacker.class);
 
+  /**
+   * For customized type of the following enums, it will try to find declared 
class in classpath
+   * and fallback to exception if ClassNotFound. This way the sizeEstimator 
and packer could be easier to
+   * extend. The major purpose for keeping this enum instead of using 
reflection only to construct the instance
+   * of packer or sizeEstimator is to maintain backward-compatibility.
+   *
+   * The constructor of customized type needs to be annotated with public 
access-modifier as it is instantiated by
+   * {@link GobblinConstructorUtils} which reside in different package, and it 
needs to have the same signature
+   * as other implementation under the enum.
+   */
   public enum PackerType {
     SINGLE_LEVEL,
-    BI_LEVEL
+    BI_LEVEL,
+    CUSTOM
   }
 
   public enum SizeEstimatorType {
     AVG_RECORD_TIME,
-    AVG_RECORD_SIZE
+    AVG_RECORD_SIZE, CUSTOM
   }
 
   public static final String KAFKA_WORKUNIT_PACKER_TYPE = 
"kafka.workunit.packer.type";
+  public static final String KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE = 
"kafka.workunit.packer.customizedType";
   private static final PackerType DEFAULT_PACKER_TYPE = 
PackerType.SINGLE_LEVEL;
 
   public static final String KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE = 
"kafka.workunit.size.estimator.type";
+  public static final String KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE = 
"kafka.workunit.size.estimator.customizedType";
   private static final SizeEstimatorType DEFAULT_SIZE_ESTIMATOR_TYPE = 
SizeEstimatorType.AVG_RECORD_TIME;
 
   protected static final double EPS = 0.01;
@@ -109,7 +122,8 @@ public abstract class KafkaWorkUnitPacker {
     workUnit.setProp(ESTIMATED_WORKUNIT_SIZE, 
this.sizeEstimator.calcEstimatedSize(workUnit));
   }
 
-  private KafkaWorkUnitSizeEstimator getWorkUnitSizeEstimator() {
+  // Setting to package-private for unit-testing purpose.
+  KafkaWorkUnitSizeEstimator getWorkUnitSizeEstimator() {
     if (this.state.contains(KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE)) {
       String sizeEstimatorTypeString = 
this.state.getProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE);
       Optional<SizeEstimatorType> sizeEstimatorType =
@@ -128,6 +142,10 @@ public abstract class KafkaWorkUnitPacker {
         return new KafkaAvgRecordTimeBasedWorkUnitSizeEstimator(this.state);
       case AVG_RECORD_SIZE:
         return new KafkaAvgRecordSizeBasedWorkUnitSizeEstimator(this.state);
+      case CUSTOM:
+        
Preconditions.checkArgument(this.state.contains(KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE));
+        String className = 
this.state.getProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE);
+        return 
GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitSizeEstimator.class, 
className, this.state);
       default:
         throw new IllegalArgumentException("WorkUnit size estimator type " + 
sizeEstimatorType + " not found");
     }
@@ -265,7 +283,7 @@ public abstract class KafkaWorkUnitPacker {
     }
   }
 
-  private static List<KafkaPartition> 
getPartitionsFromMultiWorkUnit(MultiWorkUnit multiWorkUnit) {
+  static List<KafkaPartition> getPartitionsFromMultiWorkUnit(MultiWorkUnit 
multiWorkUnit) {
     List<KafkaPartition> partitions = Lists.newArrayList();
 
     for (WorkUnit workUnit : multiWorkUnit.getWorkUnits()) {
@@ -357,6 +375,10 @@ public abstract class KafkaWorkUnitPacker {
         return new KafkaSingleLevelWorkUnitPacker(source, state);
       case BI_LEVEL:
         return new KafkaBiLevelWorkUnitPacker(source, state);
+      case CUSTOM:
+        
Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
+        String className = 
state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
+        return 
GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className, 
source, state);
       default:
         throw new IllegalArgumentException("WorkUnit packer type " + 
packerType + " not found");
     }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
new file mode 100644
index 0000000..e392566
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
+import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
+import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
+import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE;
+
+
+public class KafkaWorkUnitPackerTest {
+  private KafkaWorkUnitPacker packer;
+  AbstractSource source = Mockito.mock(AbstractSource.class);
+  SourceState state;
+
+  @BeforeMethod
+  public void setUp() {
+    state = new SourceState();
+
+    // Using customized type and having customized as a known class.
+    state.setProp(KAFKA_WORKUNIT_PACKER_TYPE, "CUSTOM");
+    state.setProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE,
+        
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaSingleLevelWorkUnitPacker");
+    state.setProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE, "CUSTOM");
+    state.setProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE,
+        
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator");
+    packer = new TestKafkaWorkUnitPacker(source, state);
+  }
+
+  @Test
+  public void testGetWorkUnitSizeEstimator() {
+    KafkaWorkUnitSizeEstimator estimator = packer.getWorkUnitSizeEstimator();
+    Assert.assertTrue(estimator instanceof 
KafkaAvgRecordTimeBasedWorkUnitSizeEstimator);
+  }
+
+  @Test
+  public void testGetInstance() {
+    KafkaWorkUnitPacker anotherPacker = 
KafkaWorkUnitPacker.getInstance(source, state);
+    Assert.assertTrue(anotherPacker instanceof KafkaSingleLevelWorkUnitPacker);
+  }
+
+  public class TestKafkaWorkUnitPacker extends KafkaWorkUnitPacker {
+    public TestKafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState 
state) {
+      super(source, state);
+    }
+
+    // Dummy implementation for making abstract class instantiable only.
+    @Override
+    public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, 
int numContainers) {
+      return null;
+    }
+  }
+}
\ No newline at end of file

Reply via email to