This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new edd718e [GOBBLIN-1336] Implement a streaming version of
TimePartitionedDataPub…
edd718e is described below
commit edd718e8a562632046f5831c0e4b6b93e22ba0e7
Author: suvasude <[email protected]>
AuthorDate: Mon Feb 1 16:20:38 2021 -0800
[GOBBLIN-1336] Implement a streaming version of TimePartitionedDataPub…
GOBBLIN-1336: Implement a streaming version of
TimePartitionedDataPublisher
Introduce a config key for final dataset publish
location
Closes #3173 from sv2000/streamingDataPublisher
---
.../gobblin/configuration/ConfigurationKeys.java | 6 +
.../TimePartitionedStreamingDataPublisher.java | 164 +++++++++++++++++++++
.../TimePartitionedStreamingDataPublisherTest.java | 84 +++++++++++
3 files changed, 254 insertions(+)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 9830e46..b86860d 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -501,6 +501,12 @@ public class ConfigurationKeys {
DATA_PUBLISHER_PREFIX + ".latest.file.arrival.timestamp";
/**
+ * Dynamically configured Publisher properties used internally
+ */
+ //Dataset-specific final publish location
+ public static final String DATA_PUBLISHER_DATASET_DIR =
DATA_PUBLISHER_PREFIX + ".dataset.dir";
+
+ /**
* Configuration properties used by the extractor.
*/
public static final String SOURCE_ENTITY = "source.entity";
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
new file mode 100644
index 0000000..379cd14
--- /dev/null
+++
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.publisher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class TimePartitionedStreamingDataPublisher extends
TimePartitionedDataPublisher {
+ private final MetricContext metricContext;
+ public TimePartitionedStreamingDataPublisher(State state) throws IOException
{
+ super(state);
+ this.metricContext = Instrumented.getMetricContext(state,
TimePartitionedStreamingDataPublisher.class);
+ }
+
+ /**
+ * This method publishes task output data for the given {@link
WorkUnitState}, but if there are output data of
+ * other tasks in the same folder, it may also publish those data.
+ */
+ protected void publishMultiTaskData(WorkUnitState state, int branchId,
Set<Path> writerOutputPathsMoved)
+ throws IOException {
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR,
this.getPublisherOutputDir(state, branchId).toString());
+ super.publishMultiTaskData(state, branchId, writerOutputPathsMoved);
+ }
+
+ @Override
+ protected void publishData(WorkUnitState state, int branchId, boolean
publishSingleTaskData,
+ Set<Path> writerOutputPathsMoved) throws IOException {
+ // The directory where the workUnitState wrote its output data.
+ Path writerOutputDir = WriterUtils.getWriterOutputDir(state,
this.numBranches, branchId);
+
+ if
(!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) {
+ log.warn(String.format("Branch %d of WorkUnit %s produced no data",
branchId, state.getId()));
+ return;
+ }
+ // The directory where the final output directory for this job will be
placed.
+ // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH.
+ Path publisherOutputDir = getPublisherOutputDir(state, branchId);
+
+ if
(!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
+ // Create the directory of the final output directory if it does not
exist before we do the actual publish
+ // This is used to force the publisher save recordPublisherOutputDirs as
the granularity to be parent of new file paths
+ // which will be used to do hive registration
+
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
+ publisherOutputDir, this.permissions.get(branchId), retrierConfig);
+ }
+ super.publishData(state, branchId, publishSingleTaskData,
writerOutputPathsMoved);
+ }
+
+ @Override
+ public void publishData(Collection<? extends WorkUnitState> states) throws
IOException {
+ publishDataImpl(states);
+ //Clean up state to remove filenames which have been committed from the
state object
+ wusCleanUp(states);
+ }
+
+ public void publishDataImpl(Collection<? extends WorkUnitState> states)
throws IOException {
+
+ // We need a Set to collect unique writer output paths as multiple tasks
may belong to the same extract. Tasks that
+ // belong to the same Extract will by default have the same output
directory
+ Set<Path> writerOutputPathsMoved = Sets.newHashSet();
+
+ for (WorkUnitState workUnitState : states) {
+ for (int branchId = 0; branchId < this.numBranches; branchId++) {
+ publishMultiTaskData(workUnitState, branchId, writerOutputPathsMoved);
+ }
+ }
+
+ //Wait for any submitted ParallelRunner threads to finish
+ for (ParallelRunner runner : this.parallelRunners.values()) {
+ runner.waitForTasks();
+ }
+
+ for (WorkUnitState workUnitState : states) {
+ // Upon successfully committing the data to the final output directory,
set states
+ // of successful tasks to COMMITTED. leaving states of unsuccessful ones
unchanged.
+ // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy.
+ workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+ }
+
+ ArrayList<WorkUnitState> statesWithLineage = Lists.newArrayList();
+
+ for (WorkUnitState state: states) {
+ if (LineageInfo.hasLineageInfo(state)) {
+ statesWithLineage.add(state);
+ }
+ }
+ long startTime = System.currentTimeMillis();
+ submitLineageEvents(statesWithLineage);
+ log.info("Emitting lineage events took {} millis",
System.currentTimeMillis() - startTime);
+ }
+
+ private void submitLineageEvents(Collection<? extends WorkUnitState> states)
{
+ for (Map.Entry<String, Collection<WorkUnitState>> entry :
LineageInfo.aggregateByLineageEvent(states).entrySet()) {
+ LineageInfo.submitLineageEvent(entry.getKey(), entry.getValue(),
metricContext);
+ }
+ }
+
+ /**
+ * A helper method to clean up {@link WorkUnitState}.
+ * @param states
+ */
+ protected void wusCleanUp(Collection<? extends WorkUnitState> states) {
+ // use the first work unit state to get common properties
+ WorkUnitState wuState = states.stream().findFirst().get();
+ int numBranches = wuState.getPropAsInt("fork.branches", 1);
+
+ // clean up state kept for data publishing
+ for (WorkUnitState state : states) {
+ for (int branchId = 0; branchId < numBranches; branchId++) {
+ String outputFilePropName =
+
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS,
numBranches,
+ branchId);
+
+ if (state.contains(outputFilePropName)) {
+ state.removeProp(outputFilePropName);
+ }
+
+ LineageInfo.removeDestinationProp(state, branchId);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ Set<Path> getPublishOutputDirs() {
+ return this.publisherOutputDirs;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisherTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisherTest.java
new file mode 100644
index 0000000..c88ce8b
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisherTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.publisher;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+@Test
+public class TimePartitionedStreamingDataPublisherTest {
+ File tmpDir = Files.createTempDir();
+ File publishDir = new File(tmpDir, "/publish");
+
+ /**
+ * Test when publish output dir does not exist,
+ * it can still record the PublishOutputDirs in right format
+ * @throws IOException
+ */
+ public void testPublishMultiTasks()
+ throws IOException {
+ WorkUnitState state1 = buildTaskState(2);
+ WorkUnitState state2 = buildTaskState(2);
+ TimePartitionedStreamingDataPublisher publisher = new
TimePartitionedStreamingDataPublisher(state1);
+ Assert.assertFalse(publishDir.exists());
+ publisher.publishData(ImmutableList.of(state1, state2));
+ Assert.assertTrue(publisher.getPublishOutputDirs().contains(new Path(
+ state1.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + 1)
+ + "/namespace/table/hourly/2020/04/01/12")));
+ Assert.assertTrue(publisher.getPublishOutputDirs().contains(new Path(
+ state1.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + 0)
+ + "/namespace/table/hourly/2020/04/01/12")));
+ Assert.assertEquals(publisher.getPublishOutputDirs().size(), 2);
+ }
+
+ private WorkUnitState buildTaskState(int numBranches) throws IOException{
+ WorkUnitState state = new WorkUnitState(WorkUnit.createEmpty(), new
State());
+
+ state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace");
+ state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table");
+ state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "namespace_table");
+ state.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, numBranches);
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
publishDir.toString());
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, tmpDir.toString());
+ if (numBranches > 1) {
+ for (int i = 0; i < numBranches; i++) {
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + i,
publishDir.toString() + "/branch" + i);
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i,
tmpDir.toString() + "/branch" + i);
+ Files.createParentDirs(new
File(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i)
+ + "/namespace/table/hourly/2020/04/01/12/data.avro"));
+ new File(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i)
+ +
"/namespace/table/hourly/2020/04/01/12/data.avro").createNewFile();
+ }
+ }
+
+ return state;
+ }
+}
\ No newline at end of file