This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0d908aa [GOBBLIN-874] Make WorkUnitPacker and SizeEstimator pluggable
0d908aa is described below
commit 0d908aa79be7cabb805458c1c698f241e4ccf519
Author: autumnust <[email protected]>
AuthorDate: Mon Sep 9 14:31:58 2019 -0700
[GOBBLIN-874] Make WorkUnitPacker and SizeEstimator pluggable
Closes #2728 from autumnust/kafkasource
---
...fkaAvgRecordSizeBasedWorkUnitSizeEstimator.java | 2 +-
...fkaAvgRecordTimeBasedWorkUnitSizeEstimator.java | 2 +-
.../packer/KafkaBiLevelWorkUnitPacker.java | 4 +-
.../packer/KafkaSingleLevelWorkUnitPacker.java | 2 +-
.../kafka/workunit/packer/KafkaWorkUnitPacker.java | 32 +++++++--
.../workunit/packer/KafkaWorkUnitPackerTest.java | 79 ++++++++++++++++++++++
6 files changed, 111 insertions(+), 10 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
index f80c7c4..a687d93 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordSizeBasedWorkUnitSizeEstimator.java
@@ -53,7 +53,7 @@ public class KafkaAvgRecordSizeBasedWorkUnitSizeEstimator
implements KafkaWorkUn
private final Map<KafkaPartition, Long> estAvgSizes = Maps.newHashMap();
- KafkaAvgRecordSizeBasedWorkUnitSizeEstimator(SourceState state) {
+ public KafkaAvgRecordSizeBasedWorkUnitSizeEstimator(SourceState state) {
readPreAvgRecordSizes(state);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
index e0539a5..3346fcc 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaAvgRecordTimeBasedWorkUnitSizeEstimator.java
@@ -63,7 +63,7 @@ public class KafkaAvgRecordTimeBasedWorkUnitSizeEstimator
implements KafkaWorkUn
private final Map<String, Double> estAvgMillis = Maps.newHashMap();
private double avgEstAvgMillis = 0.0;
- KafkaAvgRecordTimeBasedWorkUnitSizeEstimator(SourceState state) {
+ public KafkaAvgRecordTimeBasedWorkUnitSizeEstimator(SourceState state) {
readPrevAvgRecordMillis(state);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
index 9c134b8..f85fd97 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaBiLevelWorkUnitPacker.java
@@ -58,7 +58,7 @@ public class KafkaBiLevelWorkUnitPacker extends
KafkaWorkUnitPacker {
public static final String WORKUNIT_PRE_GROUPING_SIZE_FACTOR =
"workunit.pre.grouping.size.factor";
public static final double DEFAULT_WORKUNIT_PRE_GROUPING_SIZE_FACTOR = 3.0;
- protected KafkaBiLevelWorkUnitPacker(AbstractSource<?, ?> source,
SourceState state) {
+ public KafkaBiLevelWorkUnitPacker(AbstractSource<?, ?> source, SourceState
state) {
super(source, state);
}
@@ -103,7 +103,7 @@ public class KafkaBiLevelWorkUnitPacker extends
KafkaWorkUnitPacker {
* Group {@link WorkUnit}s into groups. Each group is a {@link
MultiWorkUnit}. Each group has a capacity of
* avgGroupSize. If there's a single {@link WorkUnit} whose size is larger
than avgGroupSize, it forms a group itself.
*/
- private static List<MultiWorkUnit>
bestFitDecreasingBinPacking(List<WorkUnit> workUnits, double avgGroupSize) {
+ static List<MultiWorkUnit> bestFitDecreasingBinPacking(List<WorkUnit>
workUnits, double avgGroupSize) {
// Sort workunits by data size desc
Collections.sort(workUnits, LOAD_DESC_COMPARATOR);
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
index b731e96..8c71a6a 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaSingleLevelWorkUnitPacker.java
@@ -43,7 +43,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
*/
public class KafkaSingleLevelWorkUnitPacker extends KafkaWorkUnitPacker {
- protected KafkaSingleLevelWorkUnitPacker(AbstractSource<?, ?> source,
SourceState state) {
+ public KafkaSingleLevelWorkUnitPacker(AbstractSource<?, ?> source,
SourceState state) {
super(source, state);
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
index a8dfae4..4630d6c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPacker.java
@@ -23,6 +23,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,6 @@ import
org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.MultiLongWatermark;
-import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -58,20 +58,33 @@ public abstract class KafkaWorkUnitPacker {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaWorkUnitPacker.class);
+ /**
+ * For customized type of the following enums, it will try to find declared
class in classpath
+ * and fallback to exception if ClassNotFound. This way the sizeEstimator
and packer could be easier to
+ * extend. The major purpose for keeping this enum instead of using
reflection only to construct the instance
+ * of packer or sizeEstimator is to maintain backward-compatibility.
+ *
+ * The constructor of customized type needs to be annotated with public
access-modifier as it is instantiated by
+ * {@link GobblinConstructorUtils} which reside in different package, and it
needs to have the same signature
+ * as other implementation under the enum.
+ */
public enum PackerType {
SINGLE_LEVEL,
- BI_LEVEL
+ BI_LEVEL,
+ CUSTOM
}
public enum SizeEstimatorType {
AVG_RECORD_TIME,
- AVG_RECORD_SIZE
+ AVG_RECORD_SIZE, CUSTOM
}
public static final String KAFKA_WORKUNIT_PACKER_TYPE =
"kafka.workunit.packer.type";
+ public static final String KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE =
"kafka.workunit.packer.customizedType";
private static final PackerType DEFAULT_PACKER_TYPE =
PackerType.SINGLE_LEVEL;
public static final String KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE =
"kafka.workunit.size.estimator.type";
+ public static final String KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE =
"kafka.workunit.size.estimator.customizedType";
private static final SizeEstimatorType DEFAULT_SIZE_ESTIMATOR_TYPE =
SizeEstimatorType.AVG_RECORD_TIME;
protected static final double EPS = 0.01;
@@ -109,7 +122,8 @@ public abstract class KafkaWorkUnitPacker {
workUnit.setProp(ESTIMATED_WORKUNIT_SIZE,
this.sizeEstimator.calcEstimatedSize(workUnit));
}
- private KafkaWorkUnitSizeEstimator getWorkUnitSizeEstimator() {
+ // Setting to package-private for unit-testing purpose.
+ KafkaWorkUnitSizeEstimator getWorkUnitSizeEstimator() {
if (this.state.contains(KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE)) {
String sizeEstimatorTypeString =
this.state.getProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE);
Optional<SizeEstimatorType> sizeEstimatorType =
@@ -128,6 +142,10 @@ public abstract class KafkaWorkUnitPacker {
return new KafkaAvgRecordTimeBasedWorkUnitSizeEstimator(this.state);
case AVG_RECORD_SIZE:
return new KafkaAvgRecordSizeBasedWorkUnitSizeEstimator(this.state);
+ case CUSTOM:
+
Preconditions.checkArgument(this.state.contains(KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE));
+ String className =
this.state.getProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE);
+ return
GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitSizeEstimator.class,
className, this.state);
default:
throw new IllegalArgumentException("WorkUnit size estimator type " +
sizeEstimatorType + " not found");
}
@@ -265,7 +283,7 @@ public abstract class KafkaWorkUnitPacker {
}
}
- private static List<KafkaPartition>
getPartitionsFromMultiWorkUnit(MultiWorkUnit multiWorkUnit) {
+ static List<KafkaPartition> getPartitionsFromMultiWorkUnit(MultiWorkUnit
multiWorkUnit) {
List<KafkaPartition> partitions = Lists.newArrayList();
for (WorkUnit workUnit : multiWorkUnit.getWorkUnits()) {
@@ -357,6 +375,10 @@ public abstract class KafkaWorkUnitPacker {
return new KafkaSingleLevelWorkUnitPacker(source, state);
case BI_LEVEL:
return new KafkaBiLevelWorkUnitPacker(source, state);
+ case CUSTOM:
+
Preconditions.checkArgument(state.contains(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE));
+ String className =
state.getProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE);
+ return
GobblinConstructorUtils.invokeConstructor(KafkaWorkUnitPacker.class, className,
source, state);
default:
throw new IllegalArgumentException("WorkUnit packer type " +
packerType + " not found");
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
new file mode 100644
index 0000000..e392566
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaWorkUnitPackerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.extractor.extract.AbstractSource;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE;
+import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_PACKER_TYPE;
+import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE;
+import static
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaWorkUnitPacker.KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE;
+
+
+public class KafkaWorkUnitPackerTest {
+ private KafkaWorkUnitPacker packer;
+ AbstractSource source = Mockito.mock(AbstractSource.class);
+ SourceState state;
+
+ @BeforeMethod
+ public void setUp() {
+ state = new SourceState();
+
+ // Using customized type and having customized as a known class.
+ state.setProp(KAFKA_WORKUNIT_PACKER_TYPE, "CUSTOM");
+ state.setProp(KAFKA_WORKUNIT_PACKER_CUSTOMIZED_TYPE,
+
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaSingleLevelWorkUnitPacker");
+ state.setProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_TYPE, "CUSTOM");
+ state.setProp(KAFKA_WORKUNIT_SIZE_ESTIMATOR_CUSTOMIZED_TYPE,
+
"org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaAvgRecordTimeBasedWorkUnitSizeEstimator");
+ packer = new TestKafkaWorkUnitPacker(source, state);
+ }
+
+ @Test
+ public void testGetWorkUnitSizeEstimator() {
+ KafkaWorkUnitSizeEstimator estimator = packer.getWorkUnitSizeEstimator();
+ Assert.assertTrue(estimator instanceof
KafkaAvgRecordTimeBasedWorkUnitSizeEstimator);
+ }
+
+ @Test
+ public void testGetInstance() {
+ KafkaWorkUnitPacker anotherPacker =
KafkaWorkUnitPacker.getInstance(source, state);
+ Assert.assertTrue(anotherPacker instanceof KafkaSingleLevelWorkUnitPacker);
+ }
+
+ public class TestKafkaWorkUnitPacker extends KafkaWorkUnitPacker {
+ public TestKafkaWorkUnitPacker(AbstractSource<?, ?> source, SourceState
state) {
+ super(source, state);
+ }
+
+ // Dummy implementation for making abstract class instantiable only.
+ @Override
+ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic,
int numContainers) {
+ return null;
+ }
+ }
+}
\ No newline at end of file