This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new edd718e  [GOBBLIN-1336] Implement a streaming version of 
TimePartitionedDataPub…
edd718e is described below

commit edd718e8a562632046f5831c0e4b6b93e22ba0e7
Author: suvasude <[email protected]>
AuthorDate: Mon Feb 1 16:20:38 2021 -0800

    [GOBBLIN-1336] Implement a streaming version of TimePartitionedDataPub…
    
    GOBBLIN-1336: Implement a streaming version of
    TimePartitionedDataPublisher
    
    Introduce a config key for final dataset publish
    location
    
    Closes #3173 from sv2000/streamingDataPublisher
---
 .../gobblin/configuration/ConfigurationKeys.java   |   6 +
 .../TimePartitionedStreamingDataPublisher.java     | 164 +++++++++++++++++++++
 .../TimePartitionedStreamingDataPublisherTest.java |  84 +++++++++++
 3 files changed, 254 insertions(+)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 9830e46..b86860d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -501,6 +501,12 @@ public class ConfigurationKeys {
       DATA_PUBLISHER_PREFIX + ".latest.file.arrival.timestamp";
 
   /**
+   * Dynamically configured Publisher properties used internally
+   */
+  //Dataset-specific final publish location
+  public static final String DATA_PUBLISHER_DATASET_DIR = 
DATA_PUBLISHER_PREFIX + ".dataset.dir";
+
+  /**
    * Configuration properties used by the extractor.
    */
   public static final String SOURCE_ENTITY = "source.entity";
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
new file mode 100644
index 0000000..379cd14
--- /dev/null
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisher.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.publisher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.util.ForkOperatorUtils;
+import org.apache.gobblin.util.ParallelRunner;
+import org.apache.gobblin.util.WriterUtils;
+
+
+@Slf4j
+public class TimePartitionedStreamingDataPublisher extends 
TimePartitionedDataPublisher {
+  private final MetricContext metricContext;
+  public TimePartitionedStreamingDataPublisher(State state) throws IOException 
{
+    super(state);
+    this.metricContext = Instrumented.getMetricContext(state, 
TimePartitionedStreamingDataPublisher.class);
+  }
+
+  /**
+   * This method publishes task output data for the given {@link 
WorkUnitState}, but if there are output data of
+   * other tasks in the same folder, it may also publish those data.
+   */
+  protected void publishMultiTaskData(WorkUnitState state, int branchId, 
Set<Path> writerOutputPathsMoved)
+      throws IOException {
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, 
this.getPublisherOutputDir(state, branchId).toString());
+    super.publishMultiTaskData(state, branchId, writerOutputPathsMoved);
+  }
+
+  @Override
+  protected void publishData(WorkUnitState state, int branchId, boolean 
publishSingleTaskData,
+      Set<Path> writerOutputPathsMoved) throws IOException {
+    // The directory where the workUnitState wrote its output data.
+    Path writerOutputDir = WriterUtils.getWriterOutputDir(state, 
this.numBranches, branchId);
+
+    if 
(!this.writerFileSystemByBranches.get(branchId).exists(writerOutputDir)) {
+      log.warn(String.format("Branch %d of WorkUnit %s produced no data", 
branchId, state.getId()));
+      return;
+    }
+    // The directory where the final output directory for this job will be 
placed.
+    // It is a combination of DATA_PUBLISHER_FINAL_DIR and WRITER_FILE_PATH.
+    Path publisherOutputDir = getPublisherOutputDir(state, branchId);
+
+    if 
(!this.publisherFileSystemByBranches.get(branchId).exists(publisherOutputDir)) {
+      // Create the directory of the final output directory if it does not 
exist before we do the actual publish
+      // This is used to force the publisher save recordPublisherOutputDirs as 
the granularity to be parent of new file paths
+      // which will be used to do hive registration
+      
WriterUtils.mkdirsWithRecursivePermissionWithRetry(this.publisherFileSystemByBranches.get(branchId),
+          publisherOutputDir, this.permissions.get(branchId), retrierConfig);
+    }
+    super.publishData(state, branchId, publishSingleTaskData, 
writerOutputPathsMoved);
+  }
+
+  @Override
+  public void publishData(Collection<? extends WorkUnitState> states) throws 
IOException {
+    publishDataImpl(states);
+    //Clean up state to remove filenames which have been committed from the 
state object
+    wusCleanUp(states);
+  }
+
+  public void publishDataImpl(Collection<? extends WorkUnitState> states) 
throws IOException {
+
+    // We need a Set to collect unique writer output paths as multiple tasks 
may belong to the same extract. Tasks that
+    // belong to the same Extract will by default have the same output 
directory
+    Set<Path> writerOutputPathsMoved = Sets.newHashSet();
+
+    for (WorkUnitState workUnitState : states) {
+      for (int branchId = 0; branchId < this.numBranches; branchId++) {
+        publishMultiTaskData(workUnitState, branchId, writerOutputPathsMoved);
+      }
+    }
+
+    //Wait for any submitted ParallelRunner threads to finish
+    for (ParallelRunner runner : this.parallelRunners.values()) {
+      runner.waitForTasks();
+    }
+
+    for (WorkUnitState workUnitState : states) {
+      // Upon successfully committing the data to the final output directory, 
set states
+      // of successful tasks to COMMITTED. leaving states of unsuccessful ones 
unchanged.
+      // This makes sense to the COMMIT_ON_PARTIAL_SUCCESS policy.
+      workUnitState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+    }
+
+    ArrayList<WorkUnitState> statesWithLineage = Lists.newArrayList();
+
+    for (WorkUnitState state: states) {
+      if (LineageInfo.hasLineageInfo(state)) {
+        statesWithLineage.add(state);
+      }
+    }
+    long startTime = System.currentTimeMillis();
+    submitLineageEvents(statesWithLineage);
+    log.info("Emitting lineage events took {} millis", 
System.currentTimeMillis() - startTime);
+  }
+
+  private void submitLineageEvents(Collection<? extends WorkUnitState> states) 
{
+    for (Map.Entry<String, Collection<WorkUnitState>> entry : 
LineageInfo.aggregateByLineageEvent(states).entrySet()) {
+      LineageInfo.submitLineageEvent(entry.getKey(), entry.getValue(), 
metricContext);
+    }
+  }
+
+  /**
+   * A helper method to clean up {@link WorkUnitState}.
+   * @param states
+   */
+  protected void wusCleanUp(Collection<? extends WorkUnitState> states) {
+    // use the first work unit state to get common properties
+    WorkUnitState wuState = states.stream().findFirst().get();
+    int numBranches = wuState.getPropAsInt("fork.branches", 1);
+
+    // clean up state kept for data publishing
+    for (WorkUnitState state : states) {
+      for (int branchId = 0; branchId < numBranches; branchId++) {
+        String outputFilePropName =
+            
ForkOperatorUtils.getPropertyNameForBranch(ConfigurationKeys.WRITER_FINAL_OUTPUT_FILE_PATHS,
 numBranches,
+                branchId);
+
+        if (state.contains(outputFilePropName)) {
+          state.removeProp(outputFilePropName);
+        }
+
+        LineageInfo.removeDestinationProp(state, branchId);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  Set<Path> getPublishOutputDirs() {
+    return this.publisherOutputDirs;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisherTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisherTest.java
new file mode 100644
index 0000000..c88ce8b
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/publisher/TimePartitionedStreamingDataPublisherTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.publisher;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+
+@Test
+public class TimePartitionedStreamingDataPublisherTest {
+  File tmpDir = Files.createTempDir();
+  File publishDir = new File(tmpDir, "/publish");
+
+  /**
+   * Test when publish output dir does not exist,
+   * it can still record the PublishOutputDirs in right format
+   * @throws IOException
+   */
+  public void testPublishMultiTasks()
+      throws IOException {
+    WorkUnitState state1 = buildTaskState(2);
+    WorkUnitState state2 = buildTaskState(2);
+    TimePartitionedStreamingDataPublisher publisher = new 
TimePartitionedStreamingDataPublisher(state1);
+    Assert.assertFalse(publishDir.exists());
+    publisher.publishData(ImmutableList.of(state1, state2));
+    Assert.assertTrue(publisher.getPublishOutputDirs().contains(new Path(
+        state1.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + 1)
+            + "/namespace/table/hourly/2020/04/01/12")));
+    Assert.assertTrue(publisher.getPublishOutputDirs().contains(new Path(
+        state1.getProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + 0)
+            + "/namespace/table/hourly/2020/04/01/12")));
+    Assert.assertEquals(publisher.getPublishOutputDirs().size(), 2);
+  }
+
+  private WorkUnitState buildTaskState(int numBranches) throws IOException{
+    WorkUnitState state = new WorkUnitState(WorkUnit.createEmpty(), new 
State());
+
+    state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "namespace");
+    state.setProp(ConfigurationKeys.EXTRACT_TABLE_NAME_KEY, "table");
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH_TYPE, "namespace_table");
+    state.setProp(ConfigurationKeys.FORK_BRANCHES_KEY, numBranches);
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, 
publishDir.toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, tmpDir.toString());
+    if (numBranches > 1) {
+      for (int i = 0; i < numBranches; i++) {
+        state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR + "." + i, 
publishDir.toString() + "/branch" + i);
+        state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i, 
tmpDir.toString() + "/branch" + i);
+        Files.createParentDirs(new 
File(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i)
+            + "/namespace/table/hourly/2020/04/01/12/data.avro"));
+        new File(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR + "." + i)
+            + 
"/namespace/table/hourly/2020/04/01/12/data.avro").createNewFile();
+      }
+    }
+
+    return state;
+  }
+}
\ No newline at end of file

Reply via email to