This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new abd71db  [GOBBLIN-1222] Create right abstraction to assemble dataset 
staging dir for Hive dataset finder
abd71db is described below

commit abd71dbef280b7ba741343c8634688e7c99de394
Author: Vaibhav Arya <[email protected]>
AuthorDate: Fri Aug 14 10:27:24 2020 -0700

    [GOBBLIN-1222] Create right abstraction to assemble dataset staging dir for 
Hive dataset finder
    
    Closes #3070 from aryavaibhav93/master
---
 .../apache/gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../apache/gobblin/data/management/copy/CopySource.java   |  7 +++++++
 .../gobblin/data/management/copy/hive/HiveDataset.java    |  8 ++++++++
 .../data/management/copy/hive/HiveDatasetFinder.java      |  2 ++
 .../copy/writer/FileAwareInputStreamDataWriter.java       | 15 ++++++++++++++-
 5 files changed, 32 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 98434e1..1300af6 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -209,6 +209,7 @@ public class ConfigurationKeys {
   public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY = 
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
   public static final String USER_DEFINED_STATIC_STAGING_DIR = 
"user.defined.static.staging.dir";
   public static final String USER_DEFINED_STAGING_DIR_FLAG = 
"user.defined.staging.dir.flag";
+  public static final String IS_DATASET_STAGING_DIR_USED = 
"dataset.staging.dir.used";
 
   public static final String QUEUED_TASK_TIME_MAX_SIZE = 
"taskexecutor.queued_task_time.history.max_size";
   public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index e7d2fe9..8b72d54 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -53,6 +53,8 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
 import 
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import 
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
 import 
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
 import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
@@ -130,6 +132,8 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
   public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
   public static final String FILESET_TOTAL_SIZE_IN_BYTES = 
"fileset.total.size";
   public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
+  public static final String DATASET_STAGING_DIR_PATH = 
"dataset.staging.dir.path";
+  public static final String DATASET_STAGING_PATH = "dataset.staging.path";
   public final static boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
 
   private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX 
+ ".workUnitWeight";
@@ -366,6 +370,9 @@ public class CopySource extends AbstractSource<String, 
FileAwareInputStream> {
               workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA, 
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
             }
           }
+          if ((this.copyableDataset instanceof HiveDataset) && 
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
+            workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset) 
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+          }
           serializeCopyEntity(workUnit, copyEntity);
           serializeCopyableDataset(workUnit, metadata);
           GobblinMetrics.addCustomTagToState(workUnit,
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index d550ec9..059efff 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -67,6 +67,7 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.request_allocation.PushDownRequestor;
 
+import static 
org.apache.gobblin.data.management.copy.hive.HiveTargetPathHelper.*;
 
 /**
  * Hive dataset implementing {@link CopyableDataset}.
@@ -85,6 +86,7 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
   public static final String DATASET_NAME_PATTERN_KEY = 
"hive.datasetNamePattern";
   public static final String DATABASE = "Database";
   public static final String TABLE = "Table";
+  public static final String DATASET_STAGING_PATH = "dataset.staging.path";
 
   public static final String DATABASE_TOKEN = "$DB";
   public static final String TABLE_TOKEN = "$TABLE";
@@ -93,6 +95,7 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
   public static final String LOGICAL_TABLE_TOKEN = "$LOGICAL_TABLE";
 
   // Will not be serialized/de-serialized
+  @Getter
   protected transient final Properties properties;
   protected transient final FileSystem fs;
   protected transient final HiveMetastoreClientPool clientPool;
@@ -125,6 +128,11 @@ public class HiveDataset implements 
PrioritizedCopyableDataset {
         Optional.fromNullable(this.table.getDataLocation());
 
     this.tableIdentifier = this.table.getDbName() + "." + 
this.table.getTableName();
+    Path tableLocation = this.table.getPath();
+    if (!(this.properties.isEmpty())) {
+      String datasetStagingDir = 
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" + 
tableLocation.getName();
+      properties.setProperty(DATASET_STAGING_PATH,datasetStagingDir);
+    }
 
     this.datasetNamePattern = 
Optional.fromNullable(ConfigUtils.getString(datasetConfig, 
DATASET_NAME_PATTERN_KEY, null));
     this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
index e04783f..a73ae70 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
@@ -29,6 +29,7 @@ import java.util.Properties;
 import javax.annotation.Nonnull;
 
 import lombok.Data;
+import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.commons.lang3.StringUtils;
@@ -106,6 +107,7 @@ public class HiveDatasetFinder implements 
IterableDatasetFinder<HiveDataset> {
   private static final String DATASET_ERROR = "DatasetError";
   private static final String FAILURE_CONTEXT = "FailureContext";
 
+  @Getter
   protected final Properties properties;
   protected final HiveMetastoreClientPool clientPool;
   protected final FileSystem fs;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 53ca24a..852c9b4 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -88,6 +88,8 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
   public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
"gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = 
false;
+  public static final String STAGING_DIR_SUFFIX = "/taskStaging";
+  public static final String DATASET_STAGING_DIR_PATH = 
"dataset.staging.dir.path";
 
   protected final AtomicLong bytesWritten = new AtomicLong();
   protected final AtomicLong filesWritten = new AtomicLong();
@@ -140,16 +142,27 @@ public class FileAwareInputStreamDataWriter extends 
InstrumentedDataWriter<FileA
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
 
+    /**
+     * The staging directory defines the path of staging folder.
+     * USER_DEFINED_STATIC_STAGING_DIR_FLAG shall be set to true when user 
wants to specify the staging folder and the directory can be fetched through 
USER_DEFINED_STATIC_STAGING_DIR property.
+     * IS_DATASET_STAGING_DIR_USED when true creates the staging folder within 
a dataset location for dataset copying.
+     * Else system will calculate the staging directory automatically.
+     */
     if 
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) 
{
       this.stagingDir = new 
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if 
((state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) 
{
+      String stgDir = state.getProp(DATASET_STAGING_DIR_PATH) + 
STAGING_DIR_SUFFIX + "/" + state.getProp(ConfigurationKeys.JOB_NAME_KEY ) + "/" 
+ state.getProp(ConfigurationKeys.JOB_ID_KEY);
+      state.setProp(ConfigurationKeys.WRITER_STAGING_DIR,stgDir);
+      this.stagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils.getWriterStagingDir(state, numBranches, branchId, 
this.writerAttemptIdOptional.get())
+          : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
     } else {
       this.stagingDir = this.writerAttemptIdOptional.isPresent() ? 
WriterUtils.getWriterStagingDir(state, numBranches, branchId, 
this.writerAttemptIdOptional.get())
           : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
     }
 
-    this.outputDir = getOutputDir(state);
     this.copyableDatasetMetadata =
         
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
+    this.outputDir = getOutputDir(state);
     this.recoveryHelper = new RecoveryHelper(this.fs, state);
     this.actualProcessedCopyableFile = Optional.absent();
 

Reply via email to