This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 46e3cf5  [GOBBLIN-1377] Adds functionality to create shards for target 
directories in hive distcp
46e3cf5 is described below

commit 46e3cf5da54fd38829653c8af167e128a67e3723
Author: William Lo <[email protected]>
AuthorDate: Thu Mar 18 12:54:03 2021 -0700

    [GOBBLIN-1377] Adds functionality to create shards for target directories 
in hive distcp
    
    Adds handler to create and manage shards
    
    addresses review
    
    sets dataset path to private
    
    addresses last review
    
    fix tests
    
    Closes #3158 from Will-Lo/target-directory-
    sharding-hive-distcp
---
 .../gobblin/configuration/ConfigurationKeys.java   |  11 +-
 .../destination/DestinationDatasetHandler.java     |  25 ++-
 .../DestinationDatasetHandlerFactory.java          |  37 +++++
 .../DestinationDatasetHandlerService.java          |  78 +++++++++
 .../DestinationDatasetHandlerServiceTest.java      | 115 +++++++++++++
 .../destination/TestDestinationDatasetHandler.java |  28 +++-
 .../gobblin/data/management/copy/CopySource.java   |  12 +-
 .../data/management/copy/CopyableDatasetBase.java  |   1 +
 .../management/copy/hive/HiveCopyEntityHelper.java | 184 ++++++++++++---------
 .../data/management/copy/hive/HiveDataset.java     |  12 +-
 .../management/copy/hive/HivePartitionFileSet.java |   2 +-
 .../copy/publisher/CopyDataPublisher.java          |  19 ++-
 .../writer/FileAwareInputStreamDataWriter.java     |  14 --
 .../data/management/copy/CopySourceTest.java       |  43 ++++-
 .../management/copy/MockHiveDatasetFinder.java     |  93 +++++++++++
 .../copy/hive/HiveCopyEntityHelperTest.java        |  71 ++++++++
 .../copy/publisher/CopyDataPublisherTest.java      |   6 +-
 .../publisher/DeletingCopyDataPublisherTest.java   |   6 +-
 .../gobblin/runtime/AbstractJobLauncher.java       |   5 +
 .../org/apache/gobblin/util/JobLauncherUtils.java  |   9 +-
 20 files changed, 630 insertions(+), 141 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index abff79e..12f1a76 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -210,7 +210,6 @@ public class ConfigurationKeys {
   public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = 
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
   public static final String USER_DEFINED_STATIC_STAGING_DIR = 
"user.defined.static.staging.dir";
   public static final String USER_DEFINED_STAGING_DIR_FLAG = 
"user.defined.staging.dir.flag";
-  public static final String IS_DATASET_STAGING_DIR_USED = 
"dataset.staging.dir.used";
 
   public static final String QUEUED_TASK_TIME_MAX_SIZE = 
"taskexecutor.queued_task_time.history.max_size";
   public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
@@ -1062,4 +1061,14 @@ public class ConfigurationKeys {
    */
   public static final String TASK_EVENT_METADATA_GENERATOR_CLASS_KEY = 
"gobblin.task.event.metadata.generator.class";
   public static final String DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY = 
"nooptask";
+
+  /**
+   * Configuration for sharded directory files
+   */
+  public static final String USE_DATASET_LOCAL_WORK_DIR = 
"gobblin.useDatasetLocalWorkDir";
+  public static final String DESTINATION_DATASET_HANDLER_CLASS = 
"gobblin.destination.datasetHandlerClass";
+  public static final String DATASET_DESTINATION_PATH = 
"gobblin.dataset.destination.path";
+  public static final String STAGING_DIR_DEFAULT_SUFFIX = "/.temp/taskStaging";
+  public static final String OUTPUT_DIR_DEFAULT_SUFFIX = "/.temp/taskOutput";
+  public static final String ROW_LEVEL_ERR_FILE_DEFAULT_SUFFIX = "/err";
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
similarity index 56%
copy from 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
copy to 
gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
index c27b839..758634d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
@@ -15,14 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.data.management.copy;
-
-import org.apache.gobblin.dataset.Dataset;
+package org.apache.gobblin.destination;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.gobblin.source.workunit.WorkUnit;
 
 /**
- * A common superinterface for {@link Dataset}s that can be operated on by 
distcp.
- * Concrete classes must implement a subinterface of this interface ({@link 
CopyableDataset} or {@link IterableCopyableDataset}).
+ * Performs work related to initializing the target environment before the 
files are written and published
  */
-public interface CopyableDatasetBase extends Dataset {
+public interface DestinationDatasetHandler extends Closeable {
+
+  /**
+   * Handle destination setup before workunits are sent to writer and publisher
+   * @param workUnits
+   */
+  void handle(Collection<WorkUnit> workUnits) throws IOException;
+
+  /**
+   * Perform cleanup if needed
+   * @throws IOException
+   */
+  void close() throws IOException;
 }
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerFactory.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerFactory.java
new file mode 100644
index 0000000..f3f6831
--- /dev/null
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.destination;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+public class DestinationDatasetHandlerFactory  {
+
+  public static DestinationDatasetHandler newInstance(String handlerTypeName, 
SourceState state, Boolean canCleanUp) {
+    try {
+      ClassAliasResolver<DestinationDatasetHandler> aliasResolver = new 
ClassAliasResolver<>(DestinationDatasetHandler.class);
+      DestinationDatasetHandler handler = 
GobblinConstructorUtils.invokeLongestConstructor(
+          aliasResolver.resolveClass(handlerTypeName), state, canCleanUp);
+      return handler;
+    } catch (ReflectiveOperationException rfe) {
+      throw new RuntimeException("Could not construct 
DestinationDatasetHandler " + handlerTypeName, rfe);
+    }
+  }
+}
diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
new file mode 100644
index 0000000..41c768d
--- /dev/null
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.destination;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+
+/**
+ * Initializes and runs handlers on workunits before writers are initialized
+ * Reads {@link ConfigurationKeys#DESTINATION_DATASET_HANDLER_CLASS} as a list
+ * of classes, separated by comma to initialize the handlers
+ */
+public class DestinationDatasetHandlerService implements Closeable {
+  List<DestinationDatasetHandler> handlers;
+
+  public DestinationDatasetHandlerService(SourceState jobState, Boolean 
canCleanUp, EventSubmitter eventSubmitter) {
+    this.handlers = new ArrayList<>();
+    if 
(jobState.contains(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS)) {
+      List<String> handlerList = 
jobState.getPropAsList(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS);
+      for (String handlerClass : handlerList) {
+        
this.handlers.add(DestinationDatasetHandlerFactory.newInstance(handlerClass, 
jobState, canCleanUp));
+      }
+    }
+  }
+
+  /**
+   * Executes handlers
+   * @param workUnitStream
+   */
+  public void executeHandlers(WorkUnitStream workUnitStream) {
+    if (handlers.size() > 0) {
+      if (workUnitStream.isSafeToMaterialize()) {
+        Collection<WorkUnit> workUnits = 
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+          for (DestinationDatasetHandler handler : this.handlers) {
+            try {
+              handler.handle(workUnits);
+            } catch (IOException e) {
+              throw new RuntimeException(String.format("Handler %s failed to 
execute", handler.getClass().getName()), e);
+            }
+          }
+      } else {
+        throw new 
RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not 
support work unit streams");
+      }
+    }
+  }
+
+
+  public void close() throws IOException {
+    for (DestinationDatasetHandler handler: this.handlers) {
+      handler.close();
+    }
+  }
+}
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/destination/DestinationDatasetHandlerServiceTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/destination/DestinationDatasetHandlerServiceTest.java
new file mode 100644
index 0000000..5991cb8
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/destination/DestinationDatasetHandlerServiceTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.destination;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+
+
+public class DestinationDatasetHandlerServiceTest {
+
+  EventSubmitter eventSubmitter = null;
+
+  @BeforeSuite
+  void setup() {
+    this.eventSubmitter = mock(EventSubmitter.class);
+  }
+
+  @Test
+  void testSingleHandler() throws Exception {
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS, 
TestDestinationDatasetHandler.class.getName());
+    DestinationDatasetHandlerService service = new 
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit wu = WorkUnit.createEmpty();
+      workUnits.add(wu);
+    }
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    service.executeHandlers(workUnitStream);
+
+    for (WorkUnit wu: workUnits) {
+      
Assert.assertEquals(wu.getPropAsInt(TestDestinationDatasetHandler.TEST_COUNTER_KEY),
 1);
+    }
+  }
+
+  @Test
+  void testMultipleHandlers() throws Exception {
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS,
+        TestDestinationDatasetHandler.class.getName() + "," + 
TestDestinationDatasetHandler.class.getName());
+    DestinationDatasetHandlerService service = new 
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit wu = WorkUnit.createEmpty();
+      workUnits.add(wu);
+    }
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    service.executeHandlers(workUnitStream);
+
+    for (WorkUnit wu: workUnits) {
+      // there were 2 handlers, each should have added to counter
+      
Assert.assertEquals(wu.getPropAsInt(TestDestinationDatasetHandler.TEST_COUNTER_KEY),
 2);
+    }
+  }
+
+  @Test
+  void testMultipleHandlersWhitespace() throws Exception {
+    SourceState state = new SourceState();
+    // add whitespace in class list
+    state.setProp(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS,
+        TestDestinationDatasetHandler.class.getName() + "    ,      " + 
TestDestinationDatasetHandler.class.getName());
+    DestinationDatasetHandlerService service = new 
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit wu = WorkUnit.createEmpty();
+      workUnits.add(wu);
+    }
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    service.executeHandlers(workUnitStream);
+
+    for (WorkUnit wu: workUnits) {
+      // there were 2 handlers, each should have added to counter
+      
Assert.assertEquals(wu.getPropAsInt(TestDestinationDatasetHandler.TEST_COUNTER_KEY),
 2);
+    }
+  }
+
+  @Test
+  // should not throw an exception
+  void testEmpty() throws Exception {
+    SourceState state = new SourceState();
+    DestinationDatasetHandlerService service = new 
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+    List<WorkUnit> workUnits = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      WorkUnit wu = WorkUnit.createEmpty();
+      workUnits.add(wu);
+    }
+    WorkUnitStream workUnitStream = new 
BasicWorkUnitStream.Builder(workUnits).build();
+    service.executeHandlers(workUnitStream);
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
similarity index 52%
copy from 
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
copy to 
gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
index c27b839..102c38c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
@@ -15,14 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.data.management.copy;
+package org.apache.gobblin.destination;
 
-import org.apache.gobblin.dataset.Dataset;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.workunit.WorkUnit;
 
 
-/**
- * A common superinterface for {@link Dataset}s that can be operated on by 
distcp.
- * Concrete classes must implement a subinterface of this interface ({@link 
CopyableDataset} or {@link IterableCopyableDataset}).
- */
-public interface CopyableDatasetBase extends Dataset {
+public class TestDestinationDatasetHandler implements 
DestinationDatasetHandler {
+  public static String TEST_COUNTER_KEY = "counter";
+  private Boolean canCleanUp;
+  public TestDestinationDatasetHandler(SourceState state, Boolean canCleanUp){
+    this.canCleanUp = canCleanUp;
+  }
+
+  @Override
+  public void handle(Collection<WorkUnit> workUnits) {
+    for (WorkUnit wu: workUnits) {
+      wu.setProp(TEST_COUNTER_KEY, wu.getPropAsInt(TEST_COUNTER_KEY, 0) + 1);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {}
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 8b72d54..186b7ca 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -53,8 +53,6 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
 import 
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
-import org.apache.gobblin.data.management.copy.hive.HiveDataset;
-import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import 
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
 import 
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
 import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
@@ -132,8 +130,6 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
   public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
   public static final String FILESET_TOTAL_SIZE_IN_BYTES = 
"fileset.total.size";
   public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
-  public static final String DATASET_STAGING_DIR_PATH = 
"dataset.staging.dir.path";
-  public static final String DATASET_STAGING_PATH = "dataset.staging.path";
   public final static boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
 
   private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX 
+ ".workUnitWeight";
@@ -370,9 +366,11 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
               workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
             }
           }
-          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
-            workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset) 
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
-          }
+
+          // Ensure that the writer temporary directories are contained within 
the dataset shard
+          String datasetPath = this.copyableDataset.getDatasetPath();
+          workUnit.setProp(ConfigurationKeys.DATASET_DESTINATION_PATH, 
datasetPath);
+
           serializeCopyEntity(workUnit, copyEntity);
           serializeCopyableDataset(workUnit, metadata);
           GobblinMetrics.addCustomTagToState(workUnit,
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
index c27b839..077a062 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
@@ -25,4 +25,5 @@ import org.apache.gobblin.dataset.Dataset;
  * Concrete classes must implement a subinterface of this interface ({@link 
CopyableDataset} or {@link IterableCopyableDataset}).
  */
 public interface CopyableDatasetBase extends Dataset {
+  default String getDatasetPath() { return ""; }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index ad8d167..3fb0d08 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -44,8 +44,10 @@ import lombok.Getter;
 import lombok.Singular;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
@@ -82,7 +84,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InvalidInputException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 
@@ -141,7 +142,6 @@ public class HiveCopyEntityHelper {
    * partitions with Path containing '/Hourly/' will be kept.
    */
   public static final String HIVE_PARTITION_EXTENDED_FILTER_TYPE = 
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".extendedFilterType";
-
   static final Gson gson = new Gson();
 
   private static final String source_client = "source_client";
@@ -168,7 +168,7 @@ public class HiveCopyEntityHelper {
 
   private final HiveDataset dataset;
   private final CopyConfiguration configuration;
-  private final FileSystem targetFs;
+  private FileSystem targetFs;
 
   private final HiveMetastoreClientPool targetClientPool;
   private final String targetDatabase;
@@ -180,15 +180,14 @@ public class HiveCopyEntityHelper {
   private final ExistingEntityPolicy existingEntityPolicy;
   private final UnmanagedDataPolicy unmanagedDataPolicy;
   private final Optional<String> partitionFilter;
-  private Optional<? extends HivePartitionExtendedFilter> 
hivePartitionExtendedFilter;
+  private final Optional<? extends HivePartitionExtendedFilter> 
hivePartitionExtendedFilter;
   private final Optional<Predicate<HivePartitionFileSet>> fastPartitionSkip;
   private final Optional<Predicate<HiveCopyEntityHelper>> fastTableSkip;
-
   private final DeregisterFileDeleteMethod deleteMethod;
 
   private final Optional<CommitStep> tableRegistrationStep;
-  private final Map<List<String>, Partition> sourcePartitions;
-  private final Map<List<String>, Partition> targetPartitions;
+  private Map<List<String>, Partition> sourcePartitions;
+  private Map<List<String>, Partition> targetPartitions;
   private final boolean enforceFileSizeMatch;
   private final EventSubmitter eventSubmitter;
   @Getter
@@ -265,7 +264,6 @@ public class HiveCopyEntityHelper {
       this.dataset = dataset;
       this.configuration = configuration;
       this.targetFs = targetFs;
-
       this.targetPathHelper = new HiveTargetPathHelper(this.dataset);
       this.enforceFileSizeMatch = configuration.isEnforceFileLengthMatch();
       this.hiveRegProps = new HiveRegProps(new 
State(this.dataset.getProperties()));
@@ -287,58 +285,14 @@ public class HiveCopyEntityHelper {
           
.valueOf(this.dataset.getProperties().getProperty(DELETE_FILES_ON_DEREGISTER).toUpperCase())
           : DEFAULT_DEREGISTER_DELETE_METHOD;
 
-      if 
(this.dataset.getProperties().containsKey(COPY_PARTITION_FILTER_GENERATOR)) {
-        try {
-          PartitionFilterGenerator generator = 
GobblinConstructorUtils.invokeFirstConstructor(
-              (Class<PartitionFilterGenerator>) Class
-                  
.forName(this.dataset.getProperties().getProperty(COPY_PARTITION_FILTER_GENERATOR)),
-              Lists.<Object> newArrayList(this.dataset.getProperties()), 
Lists.newArrayList());
-          this.partitionFilter = 
Optional.of(generator.getFilter(this.dataset));
-          log.info(String.format("Dynamic partition filter for table %s: %s.", 
this.dataset.table.getCompleteName(),
-              this.partitionFilter.get()));
-        } catch (ReflectiveOperationException roe) {
-          throw new IOException(roe);
-        }
-      } else {
-        this.partitionFilter =
-            
Optional.fromNullable(this.dataset.getProperties().getProperty(COPY_PARTITIONS_FILTER_CONSTANT));
-      }
-
-      // Initialize extended partition filter
-      if ( 
this.dataset.getProperties().containsKey(HIVE_PARTITION_EXTENDED_FILTER_TYPE)){
-        String filterType = 
dataset.getProperties().getProperty(HIVE_PARTITION_EXTENDED_FILTER_TYPE);
-        try {
-          Config config = 
ConfigFactory.parseProperties(this.dataset.getProperties());
-          this.hivePartitionExtendedFilter =
-              Optional.of(new 
ClassAliasResolver<>(HivePartitionExtendedFilterFactory.class).resolveClass(filterType).newInstance().createFilter(config));
-        } catch (ReflectiveOperationException roe) {
-          log.error("Error: Could not find filter with alias " + filterType);
-          closer.close();
-          throw new IOException(roe);
-        }
-      }
-      else {
-        this.hivePartitionExtendedFilter = Optional.absent();
-      }
-
       try {
-        this.fastPartitionSkip = 
this.dataset.getProperties().containsKey(FAST_PARTITION_SKIP_PREDICATE)
-            ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
-            (Class<Predicate<HivePartitionFileSet>>) Class
-                
.forName(this.dataset.getProperties().getProperty(FAST_PARTITION_SKIP_PREDICATE)),
-            Lists.<Object> newArrayList(this), Lists.newArrayList()))
-            : Optional.<Predicate<HivePartitionFileSet>> absent();
-
-        this.fastTableSkip = 
this.dataset.getProperties().containsKey(FAST_TABLE_SKIP_PREDICATE)
-            ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
-            (Class<Predicate<HiveCopyEntityHelper>>) Class
-                
.forName(this.dataset.getProperties().getProperty(FAST_TABLE_SKIP_PREDICATE)),
-            Lists.newArrayList()))
-            : Optional.<Predicate<HiveCopyEntityHelper>> absent();
-
-      } catch (ReflectiveOperationException roe) {
+        this.partitionFilter = this.initializePartitionFilter();
+        this.hivePartitionExtendedFilter = 
this.initializeExtendedPartitionFilter();
+        this.fastPartitionSkip = this.initializePartitionSkipper();
+        this.fastTableSkip = this.initializeTableSkipper();
+      } catch (ReflectiveOperationException e) {
         closer.close();
-        throw new IOException(roe);
+        throw new IOException(e);
       }
 
       Map<String, HiveMetastoreClientPool> namedPools =
@@ -354,13 +308,14 @@ public class HiveCopyEntityHelper {
           this.existingTargetTable = Optional.absent();
         }
 
-        // Constructing CommitStep object for table registration
-        Path targetPath = getTargetLocation(this.dataset.fs, this.targetFs, 
this.dataset.table.getDataLocation(),
-            Optional.<Partition> absent());
+        Path targetPath = getTargetLocation(this.targetFs, 
this.dataset.table.getDataLocation(), Optional.<Partition>absent());
+        this.dataset.setDatasetPath(targetPath.toUri().getRawPath());
+
         this.targetTable = getTargetTable(this.dataset.table, targetPath);
         HiveSpec tableHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
             
.withTable(HiveMetaStoreUtils.getHiveTable(this.targetTable.getTTable())).build();
 
+        // Constructing CommitStep object for table registration
         CommitStep tableRegistrationStep =
             new HiveRegisterStep(this.targetMetastoreURI, tableHiveSpec, 
this.hiveRegProps);
         this.tableRegistrationStep = Optional.of(tableRegistrationStep);
@@ -368,21 +323,7 @@ public class HiveCopyEntityHelper {
         if (this.existingTargetTable.isPresent() && 
this.existingTargetTable.get().isPartitioned()) {
           checkPartitionedTableCompatibility(this.targetTable, 
this.existingTargetTable.get());
         }
-        if (this.dataset.table.isPartitioned()) {
-          this.sourcePartitions = 
HiveUtils.getPartitionsMap(multiClient.getClient(source_client), 
this.dataset.table,
-              this.partitionFilter, this.hivePartitionExtendedFilter);
-          
HiveAvroCopyEntityHelper.updatePartitionAttributesIfAvro(this.targetTable, 
this.sourcePartitions, this);
-
-          // Note: this must be mutable, so we copy the map
-          this.targetPartitions =
-              this.existingTargetTable.isPresent() ? Maps.newHashMap(
-                  
HiveUtils.getPartitionsMap(multiClient.getClient(target_client),
-                      this.existingTargetTable.get(), this.partitionFilter, 
this.hivePartitionExtendedFilter))
-                  : Maps.<List<String>, Partition> newHashMap();
-        } else {
-          this.sourcePartitions = Maps.newHashMap();
-          this.targetPartitions = Maps.newHashMap();
-        }
+        initializeSourceAndTargetTablePartitions(multiClient);
 
       } catch (TException te) {
         closer.close();
@@ -392,6 +333,91 @@ public class HiveCopyEntityHelper {
   }
 
   /**
+   * Checks {@value COPY_PARTITION_FILTER_GENERATOR} in configuration to 
determine which class to use for hive filtering
+   * Default is to filter based on {@value COPY_PARTITIONS_FILTER_CONSTANT}, a 
constant regex
+   * @throws ReflectiveOperationException if the generator class in the 
configuration is not found
+   */
+  private Optional<String> initializePartitionFilter() throws 
ReflectiveOperationException {
+    if 
(this.dataset.getProperties().containsKey(COPY_PARTITION_FILTER_GENERATOR)) {
+      PartitionFilterGenerator generator = 
GobblinConstructorUtils.invokeFirstConstructor(
+          (Class<PartitionFilterGenerator>) Class.forName(
+              
this.dataset.getProperties().getProperty(COPY_PARTITION_FILTER_GENERATOR)),
+          Lists.<Object>newArrayList(this.dataset.getProperties()), 
Lists.newArrayList());
+      Optional<String> partitionFilter = 
Optional.of(generator.getFilter(this.dataset));
+      log.info(String.format("Dynamic partition filter for table %s: %s.", 
this.dataset.table.getCompleteName(),
+          partitionFilter.get()));
+      return partitionFilter;
+    } else {
+      return 
Optional.fromNullable(this.dataset.getProperties().getProperty(COPY_PARTITIONS_FILTER_CONSTANT));
+    }
+  }
+
+  /**
+   * Checks {@value HIVE_PARTITION_EXTENDED_FILTER_TYPE} in configuration to 
initialize more granular filtering class
+   * Default is to use none
+   * @throws ReflectiveOperationException if the filter class in the 
configuration is not found
+   */
+  private Optional<HivePartitionExtendedFilter> 
initializeExtendedPartitionFilter() throws IOException, 
ReflectiveOperationException {
+    if 
(this.dataset.getProperties().containsKey(HIVE_PARTITION_EXTENDED_FILTER_TYPE)){
+      String filterType = 
dataset.getProperties().getProperty(HIVE_PARTITION_EXTENDED_FILTER_TYPE);
+      Config config = 
ConfigFactory.parseProperties(this.dataset.getProperties());
+      return Optional.of(new 
ClassAliasResolver<>(HivePartitionExtendedFilterFactory.class).resolveClass(filterType).newInstance().createFilter(config));
+    } else {
+      return Optional.absent();
+    }
+  }
+
+  /**
+   * Checks {@value FAST_PARTITION_SKIP_PREDICATE} in configuration to 
determine the class used to find which hive partitions to skip
+   * Default is to skip none
+   * @throws ReflectiveOperationException if class in configuration is not 
found
+   */
+  private Optional<Predicate<HivePartitionFileSet>> 
initializePartitionSkipper() throws ReflectiveOperationException {
+    return 
this.dataset.getProperties().containsKey(FAST_PARTITION_SKIP_PREDICATE)
+        ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
+        (Class<Predicate<HivePartitionFileSet>>) Class
+            
.forName(this.dataset.getProperties().getProperty(FAST_PARTITION_SKIP_PREDICATE)),
+        Lists.<Object> newArrayList(this), Lists.newArrayList()))
+        : Optional.<Predicate<HivePartitionFileSet>> absent();
+  }
+
+  /**
+   * Checks {@value FAST_TABLE_SKIP_PREDICATE} in configuration to determine 
the class used to find which hive tables to skip
+   * Default is to skip none
+   * @throws ReflectiveOperationException if class in configuration is not 
found
+   */
+  private Optional<Predicate<HiveCopyEntityHelper>> initializeTableSkipper() 
throws ReflectiveOperationException {
+    return this.dataset.getProperties().containsKey(FAST_TABLE_SKIP_PREDICATE)
+        ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
+        (Class<Predicate<HiveCopyEntityHelper>>) Class
+            
.forName(this.dataset.getProperties().getProperty(FAST_TABLE_SKIP_PREDICATE)),
+        Lists.newArrayList()))
+        : Optional.<Predicate<HiveCopyEntityHelper>> absent();
+  }
+
+  /**
+   * Initializes the corresponding source and target partitions after applying 
the hive partition filters
+   * @param multiClient a map of {@link IMetaStoreClient}
+   * @throws IOException if encountering a hive error when determining 
partitions
+   */
+  private void 
initializeSourceAndTargetTablePartitions(HiveMetastoreClientPool.MultiClient 
multiClient) throws IOException {
+    if (this.dataset.table.isPartitioned()) {
+      this.sourcePartitions = 
HiveUtils.getPartitionsMap(multiClient.getClient(source_client), 
this.dataset.table, this.partitionFilter,
+          this.hivePartitionExtendedFilter);
+      
HiveAvroCopyEntityHelper.updatePartitionAttributesIfAvro(this.targetTable, 
this.sourcePartitions, this);
+
+      // Note: this must be mutable, so we copy the map
+      this.targetPartitions = this.existingTargetTable.isPresent() ? 
Maps.newHashMap(
+          HiveUtils.getPartitionsMap(multiClient.getClient(target_client), 
this.existingTargetTable.get(), this.partitionFilter,
+              this.hivePartitionExtendedFilter))
+          : Maps.<List<String>, Partition>newHashMap();
+    } else {
+        this.sourcePartitions = Maps.newHashMap();
+        this.targetPartitions = Maps.newHashMap();
+      }
+  }
+
+  /**
    * See {@link #getCopyEntities(CopyConfiguration, Comparator, 
PushDownRequestor)}. This method does not pushdown any prioritizer.
    */
   Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration) throws IOException {
@@ -639,7 +665,7 @@ public class HiveCopyEntityHelper {
     Map<Path, FileStatus> desiredTargetExistingPaths;
     try {
       desiredTargetExistingPaths = desiredTargetLocation.getPaths();
-    } catch (InvalidInputException ioe) {
+    } catch (IOException ioe) {
       // Thrown if inputFormat cannot find location in target. Since location 
doesn't exist, this set is empty.
       desiredTargetExistingPaths = Maps.newHashMap();
     }
@@ -791,14 +817,12 @@ public class HiveCopyEntityHelper {
 
   /**
    * Compute the target location for a Hive location.
-   * @param sourceFs Source {@link FileSystem}.
    * @param path source {@link Path} in Hive location.
    * @param partition partition these paths correspond to.
    * @return transformed location in the target.
    * @throws IOException if cannot generate a single target location.
    */
-  Path getTargetLocation(FileSystem sourceFs, FileSystem targetFs, Path path, 
Optional<Partition> partition)
-      throws IOException {
+  Path getTargetLocation(FileSystem targetFs, Path path, Optional<Partition> 
partition) {
     return getTargetPathHelper().getTargetPath(path, targetFs, partition, 
false);
   }
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 059efff..319b980 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import lombok.Getter;
+import lombok.Setter;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
@@ -86,7 +87,6 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
   public static final String DATASET_NAME_PATTERN_KEY = 
"hive.datasetNamePattern";
   public static final String DATABASE = "Database";
   public static final String TABLE = "Table";
-  public static final String DATASET_STAGING_PATH = "dataset.staging.path";
 
   public static final String DATABASE_TOKEN = "$DB";
   public static final String TABLE_TOKEN = "$TABLE";
@@ -94,6 +94,10 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
   public static final String LOGICAL_DB_TOKEN = "$LOGICAL_DB";
   public static final String LOGICAL_TABLE_TOKEN = "$LOGICAL_TABLE";
 
+  @Getter
+  @Setter
+  private String datasetPath;
+
   // Will not be serialized/de-serialized
   @Getter
   protected transient final Properties properties;
@@ -128,11 +132,6 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
         Optional.fromNullable(this.table.getDataLocation());
 
     this.tableIdentifier = this.table.getDbName() + "." + 
this.table.getTableName();
-    Path tableLocation = this.table.getPath();
-    if (!(this.properties.isEmpty())) {
-      String datasetStagingDir = 
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" + 
tableLocation.getName();
-      properties.setProperty(DATASET_STAGING_PATH,datasetStagingDir);
-    }
 
     this.datasetNamePattern = 
Optional.fromNullable(ConfigUtils.getString(datasetConfig, 
DATASET_NAME_PATTERN_KEY, null));
     this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());
@@ -338,4 +337,5 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
     }
     return true;
   }
+
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 98f30da..8f1f208 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -85,7 +85,7 @@ public class HivePartitionFileSet extends HiveFileSet {
       stepPriority = hiveCopyEntityHelper.addSharedSteps(copyEntities, 
fileSet, stepPriority);
 
       multiTimer.nextStage(HiveCopyEntityHelper.Stages.COMPUTE_TARGETS);
-      Path targetPath = 
hiveCopyEntityHelper.getTargetLocation(hiveCopyEntityHelper.getDataset().fs, 
hiveCopyEntityHelper.getTargetFs(),
+      Path targetPath = 
hiveCopyEntityHelper.getTargetLocation(hiveCopyEntityHelper.getTargetFs(),
           this.partition.getDataLocation(), Optional.of(this.partition));
       Partition targetPartition = getTargetPartition(this.partition, 
targetPath);
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 41d4d85..f143690 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -79,8 +79,6 @@ import org.apache.gobblin.util.WriterUtils;
 @Slf4j
 public class CopyDataPublisher extends DataPublisher implements 
UnpublishedHandling {
 
-  private final Path writerOutputDir;
-
   @Override
   public boolean isThreadSafe() {
     return this.getClass() == CopyDataPublisher.class;
@@ -116,10 +114,6 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     String uri = this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
     this.fs = FileSystem.get(URI.create(uri), 
WriterUtils.getFsConfiguration(state));
 
-    FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
-
-    this.writerOutputDir = new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
-
     MetricContext metricContext =
         Instrumented.getMetricContext(state, CopyDataPublisher.class, 
GobblinMetrics.getCustomTagsFromState(state));
 
@@ -225,9 +219,16 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
         "publishFileSet received an empty collection work units. This is an 
error in code.");
 
-    CopyableDatasetMetadata metadata = CopyableDatasetMetadata
-        
.deserialize(datasetWorkUnitStates.iterator().next().getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
-    Path datasetWriterOutputPath = new Path(this.writerOutputDir, 
datasetAndPartition.identifier());
+    WorkUnitState sampledWorkUnitState =  
datasetWorkUnitStates.iterator().next();
+
+    CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
+        sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
+
+    // If not already done, ensure that the writer outputs have the job ID 
appended to avoid corruption from previous runs
+    
FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(sampledWorkUnitState);
+    Path writerOutputDir = new 
Path(sampledWorkUnitState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+
+    Path datasetWriterOutputPath = new Path(writerOutputDir, 
datasetAndPartition.identifier());
 
     log.info("Merging all split work units.");
     DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index cba3b92..0c99088 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -84,8 +84,6 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
"gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
false;
-  public static final String STAGING_DIR_SUFFIX = "/taskStaging";
-  public static final String DATASET_STAGING_DIR_PATH = 
"dataset.staging.dir.path";
 
   protected final AtomicLong bytesWritten = new AtomicLong();
   protected final AtomicLong filesWritten = new AtomicLong();
@@ -146,20 +144,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
       this.fs = FileSystem.get(uri, conf);
     }
     this.fileContext = FileContext.getFileContext(uri, conf);
-
-    /**
-     * The staging directory defines the path of staging folder.
-     * USER_DEFINED_STATIC_STAGING_DIR_FLAG shall be set to true when user 
wants to specify the staging folder and the directory can be fetched through 
USER_DEFINED_STATIC_STAGING_DIR property.
-     * IS_DATASET_STAGING_DIR_USED when true creates the staging folder within 
a dataset location for dataset copying.
-     * Else system will calculate the staging directory automatically.
-     */
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
-    } else if 
((state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) 
{
-      String stgDir = state.getProp(DATASET_STAGING_DIR_PATH) + 
STAGING_DIR_SUFFIX + "/" + state.getProp(ConfigurationKeys.JOB_NAME_KEY ) + "/" 
+ state.getProp(ConfigurationKeys.JOB_ID_KEY);
-      state.setProp(ConfigurationKeys.WRITER_STAGING_DIR,stgDir);
-      this.stagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils.getWriterStagingDir(state, numBranches, branchId, 
this.writerAttemptIdOptional.get())
-          : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
     } else {
       this.stagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils.getWriterStagingDir(state, numBranches, branchId, 
this.writerAttemptIdOptional.get())
           : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 377b2cf..7db7b07 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -16,13 +16,16 @@
  */
 
 package org.apache.gobblin.data.management.copy;
-
+import com.google.common.io.Files;
+import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Iterator;
 import java.util.List;
-
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
@@ -190,4 +193,40 @@ public class CopySourceTest {
     Assert.assertEquals(fileSet.getTotalEntities(), 5);
     Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
   }
+
+  @Test
+  public void testDefaultHiveDatasetShardTempPaths()
+      throws IOException, NoSuchMethodException, InvocationTargetException, 
IllegalAccessException {
+    SourceState state = new SourceState();
+    Properties copyProperties = new Properties();
+    copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+    File tempDir = Files.createTempDir();
+    String tempDirRoot = tempDir.getPath();
+    tempDir.deleteOnExit();
+
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+    state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+    state.setProp("hive.dataset.whitelist", "testDB.table*"); // using a mock 
class so the finder will always find 3 tables regardless of this setting
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+    state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY, 
MockHiveDatasetFinder.class.getName());
+    state.setProp(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR, "true");
+    state.setProp("tempDirRoot", tempDirRoot);
+    state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+        
RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+    state.setProp(ConfigurationKeys.JOB_NAME_KEY, "jobName");
+    state.setProp(ConfigurationKeys.JOB_ID_KEY, "jobId");
+    CopySource source = new CopySource();
+
+    List<WorkUnit> workunits = source.getWorkunits(state);
+    workunits = JobLauncherUtils.flattenWorkUnits(workunits);
+    Assert.assertEquals(workunits.size(), 6); // workunits are created for pre 
and post publish steps
+
+    // workunits are not guaranteed to be created in any order, remove 
duplicate paths
+    Set<String> datasetPaths = workunits.stream().map(w -> 
w.getProp(ConfigurationKeys.DATASET_DESTINATION_PATH)).collect(
+        Collectors.toSet());
+
+    for (int i = 0; i < 3; i++) {
+      Assert.assertEquals(datasetPaths.contains(tempDirRoot + 
"/targetPath/testDB/table" + i), true);
+    }
+  }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/MockHiveDatasetFinder.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/MockHiveDatasetFinder.java
new file mode 100644
index 0000000..98d580c
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/MockHiveDatasetFinder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.data.management.copy.hive.HiveTargetPathHelper;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+
+public class MockHiveDatasetFinder extends HiveDatasetFinder {
+
+  Path tempDirRoot;
+  SourceState state;
+
+  public MockHiveDatasetFinder(LocalFileSystem fs, Properties properties,
+      EventSubmitter eventSubmitter) throws IOException {
+    super(fs, properties, eventSubmitter);
+    this.tempDirRoot = new Path(properties.getProperty("tempDirRoot"));
+  }
+
+  public Collection<DbAndTable> getTables() throws IOException {
+    List<DbAndTable> tables = Lists.newArrayList();
+
+    for (int i = 0; i < 3; i++) {
+      tables.add(new DbAndTable("testDB", "table" + i));
+    }
+
+    return tables;
+  }
+
+  @Override
+  public Iterator<HiveDataset> getDatasetsIterator() throws IOException {
+
+    return new AbstractIterator<HiveDataset>() {
+      private Iterator<DbAndTable> tables = getTables().iterator();
+
+      @Override
+      protected HiveDataset computeNext() {
+        try {
+          while (this.tables.hasNext()) {
+            DbAndTable dbAndTable = this.tables.next();
+            File dbPath = new File(tempDirRoot + "/testPath/testDB/" + 
dbAndTable.getTable());
+            fs.mkdirs(new Path(dbPath.getAbsolutePath()));
+            Properties hiveProperties = new Properties();
+            
hiveProperties.setProperty(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR, 
"true");
+            
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_TOBE_REPLACED,
 tempDirRoot + "/testPath");
+            
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_REPLACEMENT,
 tempDirRoot + "/targetPath");
+            Table table = new Table(Table.getEmptyTable(dbAndTable.getDb(), 
dbAndTable.getTable()));
+            table.setDataLocation(new Path(dbPath.getPath()));
+            HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new 
Properties(), Optional.absent());
+            HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, 
table, hiveProperties);
+            return dataset;
+          }
+        } catch (IOException e) {
+          Throwables.propagate(e);
+        }
+        return endOfData();
+      }
+    };
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index fc35bc1..5303c40 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -34,6 +34,10 @@ import 
org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import 
org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper.DeregisterFileDeleteMethod;
 import org.apache.gobblin.hive.HiveRegProps;
 import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
@@ -361,6 +365,73 @@ public class HiveCopyEntityHelperTest {
     
Assert.assertFalse(meta_table.getSd().getSerdeInfo().getParameters().containsKey("path"));
   }
 
+  @Test
+  public void testGetTargetLocationDefault() throws Exception {
+
+    Properties copyProperties = new Properties();
+    copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+    Path testPath = new Path("/testPath");
+    Properties hiveProperties = new Properties();
+    Table table =  new Table(Table.getEmptyTable("testDB", "testTable"));
+    table.setDataLocation(testPath);
+    HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new 
Properties(), Optional.absent());
+    HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, table, 
hiveProperties);
+
+    HiveCopyEntityHelper helper = new HiveCopyEntityHelper(dataset,
+        CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), 
copyProperties).build(),
+        new LocalFileSystem()
+    );
+
+    FileSystem fs = new LocalFileSystem();
+    // test that by default, the input path is the same as the output path
+    Path path = helper.getTargetLocation(fs, testPath, 
Optional.<Partition>absent());
+    Assert.assertEquals(testPath.toUri().getRawPath(), 
path.toUri().getRawPath());
+  }
+
+  @Test
+  public void testSetsDatasetShardPath() throws Exception {
+
+    Properties copyProperties = new Properties();
+    copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+    Path testPath = new Path("/testPath/db/table");
+    Properties hiveProperties = new Properties();
+    hiveProperties.setProperty(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR, 
"true");
+    Table table =  new Table(Table.getEmptyTable("testDB", "testTable"));
+    table.setDataLocation(testPath);
+    HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new 
Properties(), Optional.absent());
+    HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, table, 
hiveProperties);
+
+    HiveCopyEntityHelper helper = new HiveCopyEntityHelper(dataset,
+        CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), 
copyProperties).build(),
+        new LocalFileSystem()
+    );
+
+    Assert.assertEquals(helper.getDataset().getDatasetPath(), 
"/testPath/db/table");
+  }
+
+  @Test
+  public void testSetsDatasetShardPathWithReplacement() throws Exception {
+
+    Properties copyProperties = new Properties();
+    copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+    Path testPath = new Path("/testPath/db/table");
+    Properties hiveProperties = new Properties();
+    hiveProperties.setProperty(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR, 
"true");
+    
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_TOBE_REPLACED,
 "/testPath");
+    
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_REPLACEMENT,
 "/targetPath");
+    Table table =  new Table(Table.getEmptyTable("testDB", "testTable"));
+    table.setDataLocation(testPath);
+    HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new 
Properties(), Optional.absent());
+    HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, table, 
hiveProperties);
+
+    HiveCopyEntityHelper helper = new HiveCopyEntityHelper(dataset,
+        CopyConfiguration.builder(FileSystem.getLocal(new Configuration()), 
copyProperties).build(),
+        new LocalFileSystem()
+    );
+
+    Assert.assertEquals(helper.getDataset().getDatasetPath(), 
"/targetPath/db/table");
+  }
+
   private boolean containsPath(Collection<FileStatus> statuses, Path path) {
     for (FileStatus status : statuses) {
       if (status.getPath().equals(path)) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
index 498b244..4e5e8e5 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
@@ -236,11 +236,12 @@ public class CopyDataPublisherTest {
     private Path targetPath;
     private FileSystem fs;
     private CopyEntity copyEntity;
+    State state;
 
     private void createDatasetFiles() throws IOException {
       // Create writer output files
       Path datasetWriterOutputPath =
-          new Path(writerOutputPath, 
copyEntity.getDatasetAndPartition(this.metadata).identifier());
+          new Path(writerOutputPath + "/" + 
state.getProp(ConfigurationKeys.JOB_ID_KEY), 
copyEntity.getDatasetAndPartition(this.metadata).identifier());
       Path outputPathWithCurrentDirectory = new Path(datasetWriterOutputPath,
           PathUtils.withoutLeadingSeparator(this.targetPath));
       for (String path : relativeFilePaths) {
@@ -259,7 +260,7 @@ public class CopyDataPublisherTest {
       this.metadata = new CopyableDatasetMetadata(this.copyableDataset);
       this.relativeFilePaths = relativeFilePaths;
       this.writerOutputPath = new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
-
+      this.state = state;
       this.targetPath = new Path(testMethodTempPath, datasetTargetPath);
 
       FileStatus file = new FileStatus(0, false, 0, 0, 0, new Path("/file"));
@@ -277,6 +278,7 @@ public class CopyDataPublisherTest {
       List<WorkUnitState> workUnitStates =
           Lists.newArrayList(new WorkUnitState(), new WorkUnitState(), new 
WorkUnitState());
       for (WorkUnitState wus : workUnitStates) {
+        wus.addAll(this.state); // propagate job state into work unit state, 
this is always done in copysource workunit generation
         CopySource.serializeCopyableDataset(wus, metadata);
         CopySource.serializeCopyEntity(wus, this.copyEntity);
       }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
index 3b99f9a..02ce0bb 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.gobblin.data.management.copy.publisher;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
@@ -72,7 +73,10 @@ public class DeletingCopyDataPublisherTest {
     CopySource.serializeCopyEntity(wus, cf);
 
     Assert.assertTrue(fs.exists(new Path(testMethodTempPath, "test.txt")));
-
+    // these 2 properties should already be set before the publisher is called
+    wus.setProp(ConfigurationKeys.WRITER_STAGING_DIR, testMethodTempPath + "/" 
+ ConfigurationKeys.STAGING_DIR_DEFAULT_SUFFIX);
+    wus.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, testMethodTempPath + 
"/task-output");
+    wus.setProp(ConfigurationKeys.JOB_ID_KEY, "jobid");
     wus.setWorkingState(WorkingState.SUCCESSFUL);
 
     copyDataPublisher.publishData(ImmutableList.of(wus));
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 7edd645..0856415 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.MDC;
@@ -438,6 +439,10 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
           return;
         }
 
+        // Perform work needed before writing is done
+        Boolean canCleanUp = 
this.canCleanStagingData(this.jobContext.getJobState());
+        closer.register(new DestinationDatasetHandlerService(jobState, 
canCleanUp, this.eventSubmitter))
+            .executeHandlers(workUnitStream);
         //Initialize writer and converter(s)
         closer.register(WriterInitializerFactory.newInstace(jobState, 
workUnitStream)).initialize();
         closer.register(ConverterInitializerFactory.newInstance(jobState, 
workUnitStream)).initialize();
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 45eb82d..93ea4a2 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -132,10 +132,9 @@ public class JobLauncherUtils {
    * @param logger a {@link Logger} used for logging
    */
   public static void cleanJobStagingData(State state, Logger logger) throws 
IOException {
-    
Preconditions.checkArgument(state.contains(ConfigurationKeys.WRITER_STAGING_DIR),
-        "Missing required property " + ConfigurationKeys.WRITER_STAGING_DIR);
-    
Preconditions.checkArgument(state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR),
-        "Missing required property " + ConfigurationKeys.WRITER_OUTPUT_DIR);
+    if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) || 
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+      return;
+    }
 
     String writerFsUri = 
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, 
ConfigurationKeys.LOCAL_FS_URI);
     FileSystem fs = getFsWithProxy(state, writerFsUri, 
WriterUtils.getFsConfiguration(state));
@@ -267,7 +266,7 @@ public class JobLauncherUtils {
    * @return
    * @throws IOException
    */
-  private static FileSystem getFsWithProxy(final State state, final String 
fsUri, final Configuration conf) throws IOException {
+  public static FileSystem getFsWithProxy(final State state, final String 
fsUri, final Configuration conf) throws IOException {
     if (!state.getPropAsBoolean(ConfigurationKeys.SHOULD_FS_PROXY_AS_USER,
         ConfigurationKeys.DEFAULT_SHOULD_FS_PROXY_AS_USER)) {
       return FileSystem.get(URI.create(fsUri), conf);

Reply via email to