Repository: incubator-gobblin Updated Branches: refs/heads/master 707c1e4e5 -> 6160adca2
[GOBBLIN-259] Support writing Kafka messages to db/table file path Closes #2111 from zxcware/dbtable Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6160adca Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6160adca Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6160adca Branch: refs/heads/master Commit: 6160adca2b4c14eeabfc7b9d8484fd25dd47d6ee Parents: 707c1e4 Author: zhchen <[email protected]> Authored: Wed Sep 20 13:50:20 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Sep 20 13:50:20 2017 -0700 ---------------------------------------------------------------------- .../gobblin/source/workunit/WorkUnit.java | 14 +++++++++ .../extractor/extract/kafka/KafkaSource.java | 7 +++-- .../org/apache/gobblin/util/WriterUtils.java | 31 +++++++++++++++++--- .../apache/gobblin/util/WriterUtilsTest.java | 12 +++++++- 4 files changed, 57 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java index 5df29a6..38aabcb 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/WorkUnit.java @@ -304,6 +304,20 @@ public class WorkUnit extends State { return getPropAsLong(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY); } + @Override + public boolean contains(String key) { + return super.contains(key) || this.extract.contains(key); + } + + @Override + public String getProp(String key) { + String value = super.getProp(key); + if (value == null) { + value = this.extract.getProp(key); + } + return value; + } + /** * Set the low watermark of this {@link WorkUnit}. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java index 606be62..56f81e1 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java @@ -518,6 +518,7 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { // Default to job level configurations Extract.TableType currentTableType = tableType; String currentExtractNamespace = extractNamespace; + String currentExtractTableName = partition.getTopicName(); boolean isCurrentFullExtract = isFullExtract; // Update to topic specific configurations if any if (topicSpecificState.isPresent()) { @@ -526,10 +527,12 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { currentTableType = Extract.TableType.valueOf(topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY)); } currentExtractNamespace = topicState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, extractNamespace); + currentExtractTableName = + topicState.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName()); isCurrentFullExtract = topicState.getPropAsBoolean(ConfigurationKeys.EXTRACT_IS_FULL_KEY, isFullExtract); } - Extract extract = this.createExtract(currentTableType, currentExtractNamespace, partition.getTopicName()); + Extract extract = this.createExtract(currentTableType, currentExtractNamespace, currentExtractTableName); if (isCurrentFullExtract) { extract.setProp(ConfigurationKeys.EXTRACT_IS_FULL_KEY, true); } @@ -538,9 +541,9 @@ public abstract class KafkaSource<S, D> extends EventBasedSource<S, D> { if (topicSpecificState.isPresent()) { workUnit.addAll(topicSpecificState.get()); } + workUnit.setProp(TOPIC_NAME, partition.getTopicName()); addDatasetUrnOptionally(workUnit); - workUnit.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, partition.getTopicName()); workUnit.setProp(PARTITION_ID, partition.getId()); workUnit.setProp(LEADER_ID, partition.getLeader().getId()); workUnit.setProp(LEADER_HOSTANDPORT, partition.getLeader().getHostAndPort().toString()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java index f174979..bb5f495 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/WriterUtils.java @@ -57,12 +57,19 @@ public class WriterUtils { public static final Config NO_RETRY_CONFIG = ConfigFactory.empty(); - /** - * TABLENAME should be used for jobs that pull from multiple tables/topics and intend to write the records - * in each table/topic to a separate folder. Otherwise, DEFAULT can be used. - */ public enum WriterFilePathType { + /** + * Write records into namespace/table folder. If namespace has multiple components, each component will be + * a folder in the path. For example: the write file path for namespace 'org.apache.gobblin' and table 'tableName' + * will be 'org/apache/gobblin/tableName' + */ + NAMESPACE_TABLE, + /** + * TABLENAME should be used for jobs that pull from multiple tables/topics and intend to write the records + * in each table/topic to a separate folder. + */ TABLENAME, + /** Write records into the output file decided by {@link org.apache.gobblin.source.workunit.Extract}*/ DEFAULT } @@ -156,6 +163,8 @@ public class WriterUtils { } switch (getWriterFilePathType(state)) { + case NAMESPACE_TABLE: + return getNamespaceTableWriterFilePath(state); case TABLENAME: return WriterUtils.getTableNameWriterFilePath(state); default: @@ -170,6 +179,20 @@ public class WriterUtils { } /** + * Creates {@link Path} for case {@link WriterFilePathType#NAMESPACE_TABLE} with configurations + * {@link ConfigurationKeys#EXTRACT_NAMESPACE_NAME_KEY} and {@link ConfigurationKeys#EXTRACT_TABLE_NAME_KEY} + * @param state + * @return a path + */ + public static Path getNamespaceTableWriterFilePath(State state) { + Preconditions.checkArgument(state.contains(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY)); + Preconditions.checkArgument(state.contains(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY)); + + String namespace = state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY).replaceAll("\\.", Path.SEPARATOR); + return new Path( namespace + Path.SEPARATOR + state.getProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY)); + } + + /** * Creates {@link Path} for the {@link ConfigurationKeys#WRITER_FILE_PATH} key according to * {@link ConfigurationKeys#EXTRACT_TABLE_NAME_KEY}. * @param state http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6160adca/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java index 820b5a3..e14972f 100644 --- a/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java +++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/WriterUtilsTest.java @@ -93,13 +93,23 @@ public class WriterUtilsTest { @Test public void testGetWriterFilePath() { - WorkUnit state = WorkUnit.createEmpty(); + Extract extract = new Extract(TableType.SNAPSHOT_ONLY, "org.apache.gobblin.dbNamespace", "tableName"); + WorkUnit state = WorkUnit.create(extract); state.setProp(ConfigurationKeys.WRITER_FILE_PATH, TEST_WRITER_FILE_PATH); Assert.assertEquals(WriterUtils.getWriterFilePath(state, 0, 0), TEST_WRITER_FILE_PATH); state.setProp(ConfigurationKeys.WRITER_FILE_PATH + ".0", TEST_WRITER_FILE_PATH); Assert.assertEquals(WriterUtils.getWriterFilePath(state, 1, 1), TEST_WRITER_FILE_PATH); + + state.removeProp(ConfigurationKeys.WRITER_FILE_PATH); + + state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "tablename"); + Assert.assertEquals(WriterUtils.getWriterFilePath(state, 0, 0), new Path("tableName")); + + state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "namespace_table"); + Assert.assertEquals(WriterUtils.getWriterFilePath(state, 0, 0), + new Path("org/apache/gobblin/dbNamespace/tableName")); } @Test
