This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new abd71db [GOBBLIN-1222] Create right abstraction to assemble dataset
staging dir for Hive dataset finder
abd71db is described below
commit abd71dbef280b7ba741343c8634688e7c99de394
Author: Vaibhav Arya <[email protected]>
AuthorDate: Fri Aug 14 10:27:24 2020 -0700
[GOBBLIN-1222] Create right abstraction to assemble dataset staging dir for
Hive dataset finder
Closes #3070 from aryavaibhav93/master
---
.../apache/gobblin/configuration/ConfigurationKeys.java | 1 +
.../apache/gobblin/data/management/copy/CopySource.java | 7 +++++++
.../gobblin/data/management/copy/hive/HiveDataset.java | 8 ++++++++
.../data/management/copy/hive/HiveDatasetFinder.java | 2 ++
.../copy/writer/FileAwareInputStreamDataWriter.java | 15 ++++++++++++++-
5 files changed, 32 insertions(+), 1 deletion(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 98434e1..1300af6 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -209,6 +209,7 @@ public class ConfigurationKeys {
public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY =
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
public static final String USER_DEFINED_STATIC_STAGING_DIR =
"user.defined.static.staging.dir";
public static final String USER_DEFINED_STAGING_DIR_FLAG =
"user.defined.staging.dir.flag";
+ public static final String IS_DATASET_STAGING_DIR_USED =
"dataset.staging.dir.used";
public static final String QUEUED_TASK_TIME_MAX_SIZE =
"taskexecutor.queued_task_time.history.max_size";
public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index e7d2fe9..8b72d54 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -53,6 +53,8 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
import
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
@@ -130,6 +132,8 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
public static final String FILESET_TOTAL_SIZE_IN_BYTES =
"fileset.total.size";
public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
+ public static final String DATASET_STAGING_DIR_PATH =
"dataset.staging.dir.path";
+ public static final String DATASET_STAGING_PATH = "dataset.staging.path";
public final static boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX
+ ".workUnitWeight";
@@ -366,6 +370,9 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
}
}
+ if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
+ workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset)
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
+ }
serializeCopyEntity(workUnit, copyEntity);
serializeCopyableDataset(workUnit, metadata);
GobblinMetrics.addCustomTagToState(workUnit,
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index d550ec9..059efff 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -67,6 +67,7 @@ import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.request_allocation.PushDownRequestor;
+import static
org.apache.gobblin.data.management.copy.hive.HiveTargetPathHelper.*;
/**
* Hive dataset implementing {@link CopyableDataset}.
@@ -85,6 +86,7 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
public static final String DATASET_NAME_PATTERN_KEY =
"hive.datasetNamePattern";
public static final String DATABASE = "Database";
public static final String TABLE = "Table";
+ public static final String DATASET_STAGING_PATH = "dataset.staging.path";
public static final String DATABASE_TOKEN = "$DB";
public static final String TABLE_TOKEN = "$TABLE";
@@ -93,6 +95,7 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
public static final String LOGICAL_TABLE_TOKEN = "$LOGICAL_TABLE";
// Will not be serialized/de-serialized
+ @Getter
protected transient final Properties properties;
protected transient final FileSystem fs;
protected transient final HiveMetastoreClientPool clientPool;
@@ -125,6 +128,11 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
Optional.fromNullable(this.table.getDataLocation());
this.tableIdentifier = this.table.getDbName() + "." +
this.table.getTableName();
+ Path tableLocation = this.table.getPath();
+ if (!(this.properties.isEmpty())) {
+ String datasetStagingDir =
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" +
tableLocation.getName();
+ properties.setProperty(DATASET_STAGING_PATH,datasetStagingDir);
+ }
this.datasetNamePattern =
Optional.fromNullable(ConfigUtils.getString(datasetConfig,
DATASET_NAME_PATTERN_KEY, null));
this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
index e04783f..a73ae70 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDatasetFinder.java
@@ -29,6 +29,7 @@ import java.util.Properties;
import javax.annotation.Nonnull;
import lombok.Data;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -106,6 +107,7 @@ public class HiveDatasetFinder implements
IterableDatasetFinder<HiveDataset> {
private static final String DATASET_ERROR = "DatasetError";
private static final String FAILURE_CONTEXT = "FailureContext";
+ @Getter
protected final Properties properties;
protected final HiveMetastoreClientPool clientPool;
protected final FileSystem fs;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 53ca24a..852c9b4 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -88,6 +88,8 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
"gobblin.copy.task.overwrite.on.commit";
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
false;
+ public static final String STAGING_DIR_SUFFIX = "/taskStaging";
+ public static final String DATASET_STAGING_DIR_PATH =
"dataset.staging.dir.path";
protected final AtomicLong bytesWritten = new AtomicLong();
protected final AtomicLong filesWritten = new AtomicLong();
@@ -140,16 +142,27 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
this.fs = FileSystem.get(uri, conf);
this.fileContext = FileContext.getFileContext(uri, conf);
+ /**
+ * The staging directory defines the path of staging folder.
+ * USER_DEFINED_STATIC_STAGING_DIR_FLAG shall be set to true when user
wants to specify the staging folder and the directory can be fetched through
USER_DEFINED_STATIC_STAGING_DIR property.
+ * IS_DATASET_STAGING_DIR_USED when true creates the staging folder within
a dataset location for dataset copying.
+ * Else system will calculate the staging directory automatically.
+ */
if
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false))
{
this.stagingDir = new
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+ } else if
((state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false)))
{
+ String stgDir = state.getProp(DATASET_STAGING_DIR_PATH) +
STAGING_DIR_SUFFIX + "/" + state.getProp(ConfigurationKeys.JOB_NAME_KEY ) + "/"
+ state.getProp(ConfigurationKeys.JOB_ID_KEY);
+ state.setProp(ConfigurationKeys.WRITER_STAGING_DIR,stgDir);
+ this.stagingDir = this.writerAttemptIdOptional.isPresent() ?
WriterUtils.getWriterStagingDir(state, numBranches, branchId,
this.writerAttemptIdOptional.get())
+ : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
} else {
this.stagingDir = this.writerAttemptIdOptional.isPresent() ?
WriterUtils.getWriterStagingDir(state, numBranches, branchId,
this.writerAttemptIdOptional.get())
: WriterUtils.getWriterStagingDir(state, numBranches, branchId);
}
- this.outputDir = getOutputDir(state);
this.copyableDatasetMetadata =
CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
+ this.outputDir = getOutputDir(state);
this.recoveryHelper = new RecoveryHelper(this.fs, state);
this.actualProcessedCopyableFile = Optional.absent();