This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 46e3cf5 [GOBBLIN-1377] Adds functionality to create shards for target
directories in hive distcp
46e3cf5 is described below
commit 46e3cf5da54fd38829653c8af167e128a67e3723
Author: William Lo <[email protected]>
AuthorDate: Thu Mar 18 12:54:03 2021 -0700
[GOBBLIN-1377] Adds functionality to create shards for target directories
in hive distcp
Adds handler to create and manage shards
addresses review
sets dataset path to private
addresses last review
fix tests
Closes #3158 from Will-Lo/target-directory-
sharding-hive-distcp
---
.../gobblin/configuration/ConfigurationKeys.java | 11 +-
.../destination/DestinationDatasetHandler.java | 25 ++-
.../DestinationDatasetHandlerFactory.java | 37 +++++
.../DestinationDatasetHandlerService.java | 78 +++++++++
.../DestinationDatasetHandlerServiceTest.java | 115 +++++++++++++
.../destination/TestDestinationDatasetHandler.java | 28 +++-
.../gobblin/data/management/copy/CopySource.java | 12 +-
.../data/management/copy/CopyableDatasetBase.java | 1 +
.../management/copy/hive/HiveCopyEntityHelper.java | 184 ++++++++++++---------
.../data/management/copy/hive/HiveDataset.java | 12 +-
.../management/copy/hive/HivePartitionFileSet.java | 2 +-
.../copy/publisher/CopyDataPublisher.java | 19 ++-
.../writer/FileAwareInputStreamDataWriter.java | 14 --
.../data/management/copy/CopySourceTest.java | 43 ++++-
.../management/copy/MockHiveDatasetFinder.java | 93 +++++++++++
.../copy/hive/HiveCopyEntityHelperTest.java | 71 ++++++++
.../copy/publisher/CopyDataPublisherTest.java | 6 +-
.../publisher/DeletingCopyDataPublisherTest.java | 6 +-
.../gobblin/runtime/AbstractJobLauncher.java | 5 +
.../org/apache/gobblin/util/JobLauncherUtils.java | 9 +-
20 files changed, 630 insertions(+), 141 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index abff79e..12f1a76 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -210,7 +210,6 @@ public class ConfigurationKeys {
public static final String MAXIMUM_JAR_COPY_RETRY_TIMES_KEY =
JOB_JAR_FILES_KEY + ".uploading.retry.maximum";
public static final String USER_DEFINED_STATIC_STAGING_DIR =
"user.defined.static.staging.dir";
public static final String USER_DEFINED_STAGING_DIR_FLAG =
"user.defined.staging.dir.flag";
- public static final String IS_DATASET_STAGING_DIR_USED =
"dataset.staging.dir.used";
public static final String QUEUED_TASK_TIME_MAX_SIZE =
"taskexecutor.queued_task_time.history.max_size";
public static final int DEFAULT_QUEUED_TASK_TIME_MAX_SIZE = 2048;
@@ -1062,4 +1061,14 @@ public class ConfigurationKeys {
*/
public static final String TASK_EVENT_METADATA_GENERATOR_CLASS_KEY =
"gobblin.task.event.metadata.generator.class";
public static final String DEFAULT_TASK_EVENT_METADATA_GENERATOR_CLASS_KEY =
"nooptask";
+
+ /**
+ * Configuration for sharded directory files
+ */
+ public static final String USE_DATASET_LOCAL_WORK_DIR =
"gobblin.useDatasetLocalWorkDir";
+ public static final String DESTINATION_DATASET_HANDLER_CLASS =
"gobblin.destination.datasetHandlerClass";
+ public static final String DATASET_DESTINATION_PATH =
"gobblin.dataset.destination.path";
+ public static final String STAGING_DIR_DEFAULT_SUFFIX = "/.temp/taskStaging";
+ public static final String OUTPUT_DIR_DEFAULT_SUFFIX = "/.temp/taskOutput";
+ public static final String ROW_LEVEL_ERR_FILE_DEFAULT_SUFFIX = "/err";
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
similarity index 56%
copy from
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
copy to
gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
index c27b839..758634d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandler.java
@@ -15,14 +15,27 @@
* limitations under the License.
*/
-package org.apache.gobblin.data.management.copy;
-
-import org.apache.gobblin.dataset.Dataset;
+package org.apache.gobblin.destination;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.gobblin.source.workunit.WorkUnit;
/**
- * A common superinterface for {@link Dataset}s that can be operated on by
distcp.
- * Concrete classes must implement a subinterface of this interface ({@link
CopyableDataset} or {@link IterableCopyableDataset}).
+ * Performs work related to initializing the target environment before the
files are written and published
*/
-public interface CopyableDatasetBase extends Dataset {
+public interface DestinationDatasetHandler extends Closeable {
+
+ /**
+ * Handle destination setup before workunits are sent to writer and publisher
+ * @param workUnits
+ */
+ void handle(Collection<WorkUnit> workUnits) throws IOException;
+
+ /**
+ * Perform cleanup if needed
+ * @throws IOException
+ */
+ void close() throws IOException;
}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerFactory.java
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerFactory.java
new file mode 100644
index 0000000..f3f6831
--- /dev/null
+++
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.destination;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+
+public class DestinationDatasetHandlerFactory {
+
+ public static DestinationDatasetHandler newInstance(String handlerTypeName,
SourceState state, Boolean canCleanUp) {
+ try {
+ ClassAliasResolver<DestinationDatasetHandler> aliasResolver = new
ClassAliasResolver<>(DestinationDatasetHandler.class);
+ DestinationDatasetHandler handler =
GobblinConstructorUtils.invokeLongestConstructor(
+ aliasResolver.resolveClass(handlerTypeName), state, canCleanUp);
+ return handler;
+ } catch (ReflectiveOperationException rfe) {
+ throw new RuntimeException("Could not construct
DestinationDatasetHandler " + handlerTypeName, rfe);
+ }
+ }
+}
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
new file mode 100644
index 0000000..41c768d
--- /dev/null
+++
b/gobblin-core/src/main/java/org/apache/gobblin/destination/DestinationDatasetHandlerService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.destination;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.apache.gobblin.util.JobLauncherUtils;
+
+
+/**
+ * Initializes and runs handlers on workunits before writers are initialized
+ * Reads {@link ConfigurationKeys#DESTINATION_DATASET_HANDLER_CLASS} as a list
+ * of classes, separated by comma to initialize the handlers
+ */
+public class DestinationDatasetHandlerService implements Closeable {
+ List<DestinationDatasetHandler> handlers;
+
+ public DestinationDatasetHandlerService(SourceState jobState, Boolean
canCleanUp, EventSubmitter eventSubmitter) {
+ this.handlers = new ArrayList<>();
+ if
(jobState.contains(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS)) {
+ List<String> handlerList =
jobState.getPropAsList(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS);
+ for (String handlerClass : handlerList) {
+
this.handlers.add(DestinationDatasetHandlerFactory.newInstance(handlerClass,
jobState, canCleanUp));
+ }
+ }
+ }
+
+ /**
+ * Executes handlers
+ * @param workUnitStream
+ */
+ public void executeHandlers(WorkUnitStream workUnitStream) {
+ if (handlers.size() > 0) {
+ if (workUnitStream.isSafeToMaterialize()) {
+ Collection<WorkUnit> workUnits =
JobLauncherUtils.flattenWorkUnits(workUnitStream.getMaterializedWorkUnitCollection());
+ for (DestinationDatasetHandler handler : this.handlers) {
+ try {
+ handler.handle(workUnits);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("Handler %s failed to
execute", handler.getClass().getName()), e);
+ }
+ }
+ } else {
+ throw new
RuntimeException(DestinationDatasetHandlerService.class.getName() + " does not
support work unit streams");
+ }
+ }
+ }
+
+
+ public void close() throws IOException {
+ for (DestinationDatasetHandler handler: this.handlers) {
+ handler.close();
+ }
+ }
+}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/destination/DestinationDatasetHandlerServiceTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/destination/DestinationDatasetHandlerServiceTest.java
new file mode 100644
index 0000000..5991cb8
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/destination/DestinationDatasetHandlerServiceTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.destination;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitStream;
+import org.testng.Assert;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.Test;
+import static org.mockito.Mockito.mock;
+
+
+public class DestinationDatasetHandlerServiceTest {
+
+ EventSubmitter eventSubmitter = null;
+
+ @BeforeSuite
+ void setup() {
+ this.eventSubmitter = mock(EventSubmitter.class);
+ }
+
+ @Test
+ void testSingleHandler() throws Exception {
+ SourceState state = new SourceState();
+ state.setProp(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS,
TestDestinationDatasetHandler.class.getName());
+ DestinationDatasetHandlerService service = new
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit wu = WorkUnit.createEmpty();
+ workUnits.add(wu);
+ }
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ service.executeHandlers(workUnitStream);
+
+ for (WorkUnit wu: workUnits) {
+
Assert.assertEquals(wu.getPropAsInt(TestDestinationDatasetHandler.TEST_COUNTER_KEY),
1);
+ }
+ }
+
+ @Test
+ void testMultipleHandlers() throws Exception {
+ SourceState state = new SourceState();
+ state.setProp(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS,
+ TestDestinationDatasetHandler.class.getName() + "," +
TestDestinationDatasetHandler.class.getName());
+ DestinationDatasetHandlerService service = new
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit wu = WorkUnit.createEmpty();
+ workUnits.add(wu);
+ }
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ service.executeHandlers(workUnitStream);
+
+ for (WorkUnit wu: workUnits) {
+ // there were 2 handlers, each should have added to counter
+
Assert.assertEquals(wu.getPropAsInt(TestDestinationDatasetHandler.TEST_COUNTER_KEY),
2);
+ }
+ }
+
+ @Test
+ void testMultipleHandlersWhitespace() throws Exception {
+ SourceState state = new SourceState();
+ // add whitespace in class list
+ state.setProp(ConfigurationKeys.DESTINATION_DATASET_HANDLER_CLASS,
+ TestDestinationDatasetHandler.class.getName() + " , " +
TestDestinationDatasetHandler.class.getName());
+ DestinationDatasetHandlerService service = new
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit wu = WorkUnit.createEmpty();
+ workUnits.add(wu);
+ }
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ service.executeHandlers(workUnitStream);
+
+ for (WorkUnit wu: workUnits) {
+ // there were 2 handlers, each should have added to counter
+
Assert.assertEquals(wu.getPropAsInt(TestDestinationDatasetHandler.TEST_COUNTER_KEY),
2);
+ }
+ }
+
+ @Test
+ // should not throw an exception
+ void testEmpty() throws Exception {
+ SourceState state = new SourceState();
+ DestinationDatasetHandlerService service = new
DestinationDatasetHandlerService(state, true, this.eventSubmitter);
+ List<WorkUnit> workUnits = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ WorkUnit wu = WorkUnit.createEmpty();
+ workUnits.add(wu);
+ }
+ WorkUnitStream workUnitStream = new
BasicWorkUnitStream.Builder(workUnits).build();
+ service.executeHandlers(workUnitStream);
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
similarity index 52%
copy from
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
copy to
gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
index c27b839..102c38c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++
b/gobblin-core/src/test/java/org/apache/gobblin/destination/TestDestinationDatasetHandler.java
@@ -15,14 +15,28 @@
* limitations under the License.
*/
-package org.apache.gobblin.data.management.copy;
+package org.apache.gobblin.destination;
-import org.apache.gobblin.dataset.Dataset;
+import java.io.IOException;
+import java.util.Collection;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.source.workunit.WorkUnit;
-/**
- * A common superinterface for {@link Dataset}s that can be operated on by
distcp.
- * Concrete classes must implement a subinterface of this interface ({@link
CopyableDataset} or {@link IterableCopyableDataset}).
- */
-public interface CopyableDatasetBase extends Dataset {
+public class TestDestinationDatasetHandler implements
DestinationDatasetHandler {
+ public static String TEST_COUNTER_KEY = "counter";
+ private Boolean canCleanUp;
+ public TestDestinationDatasetHandler(SourceState state, Boolean canCleanUp){
+ this.canCleanUp = canCleanUp;
+ }
+
+ @Override
+ public void handle(Collection<WorkUnit> workUnits) {
+ for (WorkUnit wu: workUnits) {
+ wu.setProp(TEST_COUNTER_KEY, wu.getPropAsInt(TEST_COUNTER_KEY, 0) + 1);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
index 8b72d54..186b7ca 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java
@@ -53,8 +53,6 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.copy.extractor.EmptyExtractor;
import
org.apache.gobblin.data.management.copy.extractor.FileAwareInputStreamExtractor;
-import org.apache.gobblin.data.management.copy.hive.HiveDataset;
-import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import
org.apache.gobblin.data.management.copy.prioritization.FileSetComparator;
import
org.apache.gobblin.data.management.copy.publisher.CopyEventSubmitterHelper;
import org.apache.gobblin.data.management.copy.replication.ConfigBasedDataset;
@@ -132,8 +130,6 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
public static final String FILESET_TOTAL_ENTITIES = "fileset.total.entities";
public static final String FILESET_TOTAL_SIZE_IN_BYTES =
"fileset.total.size";
public static final String SCHEMA_CHECK_ENABLED = "shcema.check.enabled";
- public static final String DATASET_STAGING_DIR_PATH =
"dataset.staging.dir.path";
- public static final String DATASET_STAGING_PATH = "dataset.staging.path";
public final static boolean DEFAULT_SCHEMA_CHECK_ENABLED = false;
private static final String WORK_UNIT_WEIGHT = CopyConfiguration.COPY_PREFIX
+ ".workUnitWeight";
@@ -370,9 +366,11 @@ public class CopySource extends AbstractSource<String,
FileAwareInputStream> {
workUnit.setProp(ConfigurationKeys.COPY_EXPECTED_SCHEMA,
((ConfigBasedDataset) this.copyableDataset).getExpectedSchema());
}
}
- if ((this.copyableDataset instanceof HiveDataset) &&
(state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false))) {
- workUnit.setProp(DATASET_STAGING_DIR_PATH, ((HiveDataset)
this.copyableDataset).getProperties().getProperty(DATASET_STAGING_PATH));
- }
+
+ // Ensure that the writer temporary directories are contained within
the dataset shard
+ String datasetPath = this.copyableDataset.getDatasetPath();
+ workUnit.setProp(ConfigurationKeys.DATASET_DESTINATION_PATH,
datasetPath);
+
serializeCopyEntity(workUnit, copyEntity);
serializeCopyableDataset(workUnit, metadata);
GobblinMetrics.addCustomTagToState(workUnit,
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
index c27b839..077a062 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableDatasetBase.java
@@ -25,4 +25,5 @@ import org.apache.gobblin.dataset.Dataset;
* Concrete classes must implement a subinterface of this interface ({@link
CopyableDataset} or {@link IterableCopyableDataset}).
*/
public interface CopyableDatasetBase extends Dataset {
+ default String getDatasetPath() { return ""; }
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index ad8d167..3fb0d08 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -44,8 +44,10 @@ import lombok.Getter;
import lombok.Singular;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
@@ -82,7 +84,6 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
@@ -141,7 +142,6 @@ public class HiveCopyEntityHelper {
* partitions with Path containing '/Hourly/' will be kept.
*/
public static final String HIVE_PARTITION_EXTENDED_FILTER_TYPE =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".extendedFilterType";
-
static final Gson gson = new Gson();
private static final String source_client = "source_client";
@@ -168,7 +168,7 @@ public class HiveCopyEntityHelper {
private final HiveDataset dataset;
private final CopyConfiguration configuration;
- private final FileSystem targetFs;
+ private FileSystem targetFs;
private final HiveMetastoreClientPool targetClientPool;
private final String targetDatabase;
@@ -180,15 +180,14 @@ public class HiveCopyEntityHelper {
private final ExistingEntityPolicy existingEntityPolicy;
private final UnmanagedDataPolicy unmanagedDataPolicy;
private final Optional<String> partitionFilter;
- private Optional<? extends HivePartitionExtendedFilter>
hivePartitionExtendedFilter;
+ private final Optional<? extends HivePartitionExtendedFilter>
hivePartitionExtendedFilter;
private final Optional<Predicate<HivePartitionFileSet>> fastPartitionSkip;
private final Optional<Predicate<HiveCopyEntityHelper>> fastTableSkip;
-
private final DeregisterFileDeleteMethod deleteMethod;
private final Optional<CommitStep> tableRegistrationStep;
- private final Map<List<String>, Partition> sourcePartitions;
- private final Map<List<String>, Partition> targetPartitions;
+ private Map<List<String>, Partition> sourcePartitions;
+ private Map<List<String>, Partition> targetPartitions;
private final boolean enforceFileSizeMatch;
private final EventSubmitter eventSubmitter;
@Getter
@@ -265,7 +264,6 @@ public class HiveCopyEntityHelper {
this.dataset = dataset;
this.configuration = configuration;
this.targetFs = targetFs;
-
this.targetPathHelper = new HiveTargetPathHelper(this.dataset);
this.enforceFileSizeMatch = configuration.isEnforceFileLengthMatch();
this.hiveRegProps = new HiveRegProps(new
State(this.dataset.getProperties()));
@@ -287,58 +285,14 @@ public class HiveCopyEntityHelper {
.valueOf(this.dataset.getProperties().getProperty(DELETE_FILES_ON_DEREGISTER).toUpperCase())
: DEFAULT_DEREGISTER_DELETE_METHOD;
- if
(this.dataset.getProperties().containsKey(COPY_PARTITION_FILTER_GENERATOR)) {
- try {
- PartitionFilterGenerator generator =
GobblinConstructorUtils.invokeFirstConstructor(
- (Class<PartitionFilterGenerator>) Class
-
.forName(this.dataset.getProperties().getProperty(COPY_PARTITION_FILTER_GENERATOR)),
- Lists.<Object> newArrayList(this.dataset.getProperties()),
Lists.newArrayList());
- this.partitionFilter =
Optional.of(generator.getFilter(this.dataset));
- log.info(String.format("Dynamic partition filter for table %s: %s.",
this.dataset.table.getCompleteName(),
- this.partitionFilter.get()));
- } catch (ReflectiveOperationException roe) {
- throw new IOException(roe);
- }
- } else {
- this.partitionFilter =
-
Optional.fromNullable(this.dataset.getProperties().getProperty(COPY_PARTITIONS_FILTER_CONSTANT));
- }
-
- // Initialize extended partition filter
- if (
this.dataset.getProperties().containsKey(HIVE_PARTITION_EXTENDED_FILTER_TYPE)){
- String filterType =
dataset.getProperties().getProperty(HIVE_PARTITION_EXTENDED_FILTER_TYPE);
- try {
- Config config =
ConfigFactory.parseProperties(this.dataset.getProperties());
- this.hivePartitionExtendedFilter =
- Optional.of(new
ClassAliasResolver<>(HivePartitionExtendedFilterFactory.class).resolveClass(filterType).newInstance().createFilter(config));
- } catch (ReflectiveOperationException roe) {
- log.error("Error: Could not find filter with alias " + filterType);
- closer.close();
- throw new IOException(roe);
- }
- }
- else {
- this.hivePartitionExtendedFilter = Optional.absent();
- }
-
try {
- this.fastPartitionSkip =
this.dataset.getProperties().containsKey(FAST_PARTITION_SKIP_PREDICATE)
- ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
- (Class<Predicate<HivePartitionFileSet>>) Class
-
.forName(this.dataset.getProperties().getProperty(FAST_PARTITION_SKIP_PREDICATE)),
- Lists.<Object> newArrayList(this), Lists.newArrayList()))
- : Optional.<Predicate<HivePartitionFileSet>> absent();
-
- this.fastTableSkip =
this.dataset.getProperties().containsKey(FAST_TABLE_SKIP_PREDICATE)
- ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
- (Class<Predicate<HiveCopyEntityHelper>>) Class
-
.forName(this.dataset.getProperties().getProperty(FAST_TABLE_SKIP_PREDICATE)),
- Lists.newArrayList()))
- : Optional.<Predicate<HiveCopyEntityHelper>> absent();
-
- } catch (ReflectiveOperationException roe) {
+ this.partitionFilter = this.initializePartitionFilter();
+ this.hivePartitionExtendedFilter =
this.initializeExtendedPartitionFilter();
+ this.fastPartitionSkip = this.initializePartitionSkipper();
+ this.fastTableSkip = this.initializeTableSkipper();
+ } catch (ReflectiveOperationException e) {
closer.close();
- throw new IOException(roe);
+ throw new IOException(e);
}
Map<String, HiveMetastoreClientPool> namedPools =
@@ -354,13 +308,14 @@ public class HiveCopyEntityHelper {
this.existingTargetTable = Optional.absent();
}
- // Constructing CommitStep object for table registration
- Path targetPath = getTargetLocation(this.dataset.fs, this.targetFs,
this.dataset.table.getDataLocation(),
- Optional.<Partition> absent());
+ Path targetPath = getTargetLocation(this.targetFs,
this.dataset.table.getDataLocation(), Optional.<Partition>absent());
+ this.dataset.setDatasetPath(targetPath.toUri().getRawPath());
+
this.targetTable = getTargetTable(this.dataset.table, targetPath);
HiveSpec tableHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
.withTable(HiveMetaStoreUtils.getHiveTable(this.targetTable.getTTable())).build();
+ // Constructing CommitStep object for table registration
CommitStep tableRegistrationStep =
new HiveRegisterStep(this.targetMetastoreURI, tableHiveSpec,
this.hiveRegProps);
this.tableRegistrationStep = Optional.of(tableRegistrationStep);
@@ -368,21 +323,7 @@ public class HiveCopyEntityHelper {
if (this.existingTargetTable.isPresent() &&
this.existingTargetTable.get().isPartitioned()) {
checkPartitionedTableCompatibility(this.targetTable,
this.existingTargetTable.get());
}
- if (this.dataset.table.isPartitioned()) {
- this.sourcePartitions =
HiveUtils.getPartitionsMap(multiClient.getClient(source_client),
this.dataset.table,
- this.partitionFilter, this.hivePartitionExtendedFilter);
-
HiveAvroCopyEntityHelper.updatePartitionAttributesIfAvro(this.targetTable,
this.sourcePartitions, this);
-
- // Note: this must be mutable, so we copy the map
- this.targetPartitions =
- this.existingTargetTable.isPresent() ? Maps.newHashMap(
-
HiveUtils.getPartitionsMap(multiClient.getClient(target_client),
- this.existingTargetTable.get(), this.partitionFilter,
this.hivePartitionExtendedFilter))
- : Maps.<List<String>, Partition> newHashMap();
- } else {
- this.sourcePartitions = Maps.newHashMap();
- this.targetPartitions = Maps.newHashMap();
- }
+ initializeSourceAndTargetTablePartitions(multiClient);
} catch (TException te) {
closer.close();
@@ -392,6 +333,91 @@ public class HiveCopyEntityHelper {
}
/**
+ * Checks {@value COPY_PARTITION_FILTER_GENERATOR} in configuration to
determine which class to use for hive filtering
+ * Default is to filter based on {@value COPY_PARTITIONS_FILTER_CONSTANT}, a
constant regex
+ * @throws ReflectiveOperationException if the generator class in the
configuration is not found
+ */
+ private Optional<String> initializePartitionFilter() throws
ReflectiveOperationException {
+ if
(this.dataset.getProperties().containsKey(COPY_PARTITION_FILTER_GENERATOR)) {
+ PartitionFilterGenerator generator =
GobblinConstructorUtils.invokeFirstConstructor(
+ (Class<PartitionFilterGenerator>) Class.forName(
+
this.dataset.getProperties().getProperty(COPY_PARTITION_FILTER_GENERATOR)),
+ Lists.<Object>newArrayList(this.dataset.getProperties()),
Lists.newArrayList());
+ Optional<String> partitionFilter =
Optional.of(generator.getFilter(this.dataset));
+ log.info(String.format("Dynamic partition filter for table %s: %s.",
this.dataset.table.getCompleteName(),
+ partitionFilter.get()));
+ return partitionFilter;
+ } else {
+ return
Optional.fromNullable(this.dataset.getProperties().getProperty(COPY_PARTITIONS_FILTER_CONSTANT));
+ }
+ }
+
+ /**
+ * Checks {@value HIVE_PARTITION_EXTENDED_FILTER_TYPE} in configuration to
initialize more granular filtering class
+ * Default is to use none
+ * @throws ReflectiveOperationException if the filter class in the
configuration is not found
+ */
+ private Optional<HivePartitionExtendedFilter>
initializeExtendedPartitionFilter() throws IOException,
ReflectiveOperationException {
+ if
(this.dataset.getProperties().containsKey(HIVE_PARTITION_EXTENDED_FILTER_TYPE)){
+ String filterType =
dataset.getProperties().getProperty(HIVE_PARTITION_EXTENDED_FILTER_TYPE);
+ Config config =
ConfigFactory.parseProperties(this.dataset.getProperties());
+ return Optional.of(new
ClassAliasResolver<>(HivePartitionExtendedFilterFactory.class).resolveClass(filterType).newInstance().createFilter(config));
+ } else {
+ return Optional.absent();
+ }
+ }
+
+ /**
+ * Checks {@value FAST_PARTITION_SKIP_PREDICATE} in configuration to
determine the class used to find which hive partitions to skip
+ * Default is to skip none
+ * @throws ReflectiveOperationException if class in configuration is not
found
+ */
+ private Optional<Predicate<HivePartitionFileSet>>
initializePartitionSkipper() throws ReflectiveOperationException {
+ return
this.dataset.getProperties().containsKey(FAST_PARTITION_SKIP_PREDICATE)
+ ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
+ (Class<Predicate<HivePartitionFileSet>>) Class
+
.forName(this.dataset.getProperties().getProperty(FAST_PARTITION_SKIP_PREDICATE)),
+ Lists.<Object> newArrayList(this), Lists.newArrayList()))
+ : Optional.<Predicate<HivePartitionFileSet>> absent();
+ }
+
+ /**
+ * Checks {@value FAST_TABLE_SKIP_PREDICATE} in configuration to determine
the class used to find which hive tables to skip
+ * Default is to skip none
+ * @throws ReflectiveOperationException if class in configuration is not
found
+ */
+ private Optional<Predicate<HiveCopyEntityHelper>> initializeTableSkipper()
throws ReflectiveOperationException {
+ return this.dataset.getProperties().containsKey(FAST_TABLE_SKIP_PREDICATE)
+ ? Optional.of(GobblinConstructorUtils.invokeFirstConstructor(
+ (Class<Predicate<HiveCopyEntityHelper>>) Class
+
.forName(this.dataset.getProperties().getProperty(FAST_TABLE_SKIP_PREDICATE)),
+ Lists.newArrayList()))
+ : Optional.<Predicate<HiveCopyEntityHelper>> absent();
+ }
+
+ /**
+ * Initializes the corresponding source and target partitions after applying
the hive partition filters
+ * @param multiClient a map of {@link IMetaStoreClient}
+ * @throws IOException if encountering a hive error when determining
partitions
+ */
+ private void
initializeSourceAndTargetTablePartitions(HiveMetastoreClientPool.MultiClient
multiClient) throws IOException {
+ if (this.dataset.table.isPartitioned()) {
+ this.sourcePartitions =
HiveUtils.getPartitionsMap(multiClient.getClient(source_client),
this.dataset.table, this.partitionFilter,
+ this.hivePartitionExtendedFilter);
+
HiveAvroCopyEntityHelper.updatePartitionAttributesIfAvro(this.targetTable,
this.sourcePartitions, this);
+
+ // Note: this must be mutable, so we copy the map
+ this.targetPartitions = this.existingTargetTable.isPresent() ?
Maps.newHashMap(
+ HiveUtils.getPartitionsMap(multiClient.getClient(target_client),
this.existingTargetTable.get(), this.partitionFilter,
+ this.hivePartitionExtendedFilter))
+ : Maps.<List<String>, Partition>newHashMap();
+ } else {
+ this.sourcePartitions = Maps.newHashMap();
+ this.targetPartitions = Maps.newHashMap();
+ }
+ }
+
+ /**
* See {@link #getCopyEntities(CopyConfiguration, Comparator,
PushDownRequestor)}. This method does not pushdown any prioritizer.
*/
Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration
configuration) throws IOException {
@@ -639,7 +665,7 @@ public class HiveCopyEntityHelper {
Map<Path, FileStatus> desiredTargetExistingPaths;
try {
desiredTargetExistingPaths = desiredTargetLocation.getPaths();
- } catch (InvalidInputException ioe) {
+ } catch (IOException ioe) {
// Thrown if inputFormat cannot find location in target. Since location
doesn't exist, this set is empty.
desiredTargetExistingPaths = Maps.newHashMap();
}
@@ -791,14 +817,12 @@ public class HiveCopyEntityHelper {
/**
* Compute the target location for a Hive location.
- * @param sourceFs Source {@link FileSystem}.
* @param path source {@link Path} in Hive location.
* @param partition partition these paths correspond to.
* @return transformed location in the target.
* @throws IOException if cannot generate a single target location.
*/
- Path getTargetLocation(FileSystem sourceFs, FileSystem targetFs, Path path,
Optional<Partition> partition)
- throws IOException {
+ Path getTargetLocation(FileSystem targetFs, Path path, Optional<Partition>
partition) {
return getTargetPathHelper().getTargetPath(path, targetFs, partition,
false);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
index 059efff..319b980 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveDataset.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Properties;
import lombok.Getter;
+import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
@@ -86,7 +87,6 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
public static final String DATASET_NAME_PATTERN_KEY =
"hive.datasetNamePattern";
public static final String DATABASE = "Database";
public static final String TABLE = "Table";
- public static final String DATASET_STAGING_PATH = "dataset.staging.path";
public static final String DATABASE_TOKEN = "$DB";
public static final String TABLE_TOKEN = "$TABLE";
@@ -94,6 +94,10 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
public static final String LOGICAL_DB_TOKEN = "$LOGICAL_DB";
public static final String LOGICAL_TABLE_TOKEN = "$LOGICAL_TABLE";
+ @Getter
+ @Setter
+ private String datasetPath;
+
// Will not be serialized/de-serialized
@Getter
protected transient final Properties properties;
@@ -128,11 +132,6 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
Optional.fromNullable(this.table.getDataLocation());
this.tableIdentifier = this.table.getDbName() + "." +
this.table.getTableName();
- Path tableLocation = this.table.getPath();
- if (!(this.properties.isEmpty())) {
- String datasetStagingDir =
this.properties.getProperty(COPY_TARGET_TABLE_PREFIX_REPLACEMENT) + "/" +
tableLocation.getName();
- properties.setProperty(DATASET_STAGING_PATH,datasetStagingDir);
- }
this.datasetNamePattern =
Optional.fromNullable(ConfigUtils.getString(datasetConfig,
DATASET_NAME_PATTERN_KEY, null));
this.dbAndTable = new DbAndTable(table.getDbName(), table.getTableName());
@@ -338,4 +337,5 @@ public class HiveDataset implements
PrioritizedCopyableDataset {
}
return true;
}
+
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 98f30da..8f1f208 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -85,7 +85,7 @@ public class HivePartitionFileSet extends HiveFileSet {
stepPriority = hiveCopyEntityHelper.addSharedSteps(copyEntities,
fileSet, stepPriority);
multiTimer.nextStage(HiveCopyEntityHelper.Stages.COMPUTE_TARGETS);
- Path targetPath =
hiveCopyEntityHelper.getTargetLocation(hiveCopyEntityHelper.getDataset().fs,
hiveCopyEntityHelper.getTargetFs(),
+ Path targetPath =
hiveCopyEntityHelper.getTargetLocation(hiveCopyEntityHelper.getTargetFs(),
this.partition.getDataLocation(), Optional.of(this.partition));
Partition targetPartition = getTargetPartition(this.partition,
targetPath);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index 41d4d85..f143690 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -79,8 +79,6 @@ import org.apache.gobblin.util.WriterUtils;
@Slf4j
public class CopyDataPublisher extends DataPublisher implements
UnpublishedHandling {
- private final Path writerOutputDir;
-
@Override
public boolean isThreadSafe() {
return this.getClass() == CopyDataPublisher.class;
@@ -116,10 +114,6 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
String uri = this.state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
this.fs = FileSystem.get(URI.create(uri),
WriterUtils.getFsConfiguration(state));
- FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(state);
-
- this.writerOutputDir = new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
-
MetricContext metricContext =
Instrumented.getMetricContext(state, CopyDataPublisher.class,
GobblinMetrics.getCustomTagsFromState(state));
@@ -225,9 +219,16 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
Preconditions.checkArgument(!datasetWorkUnitStates.isEmpty(),
"publishFileSet received an empty collection work units. This is an
error in code.");
- CopyableDatasetMetadata metadata = CopyableDatasetMetadata
-
.deserialize(datasetWorkUnitStates.iterator().next().getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
- Path datasetWriterOutputPath = new Path(this.writerOutputDir,
datasetAndPartition.identifier());
+ WorkUnitState sampledWorkUnitState =
datasetWorkUnitStates.iterator().next();
+
+ CopyableDatasetMetadata metadata = CopyableDatasetMetadata.deserialize(
+ sampledWorkUnitState.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
+
+ // If not already done, ensure that the writer outputs have the job ID
appended to avoid corruption from previous runs
+
FileAwareInputStreamDataWriterBuilder.setJobSpecificOutputPaths(sampledWorkUnitState);
+ Path writerOutputDir = new
Path(sampledWorkUnitState.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
+
+ Path datasetWriterOutputPath = new Path(writerOutputDir,
datasetAndPartition.identifier());
log.info("Merging all split work units.");
DistcpFileSplitter.mergeAllSplitWorkUnits(this.fs, datasetWorkUnitStates);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index cba3b92..0c99088 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -84,8 +84,6 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
"gobblin.copy.task.overwrite.on.commit";
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT =
false;
- public static final String STAGING_DIR_SUFFIX = "/taskStaging";
- public static final String DATASET_STAGING_DIR_PATH =
"dataset.staging.dir.path";
protected final AtomicLong bytesWritten = new AtomicLong();
protected final AtomicLong filesWritten = new AtomicLong();
@@ -146,20 +144,8 @@ public class FileAwareInputStreamDataWriter extends
InstrumentedDataWriter<FileA
this.fs = FileSystem.get(uri, conf);
}
this.fileContext = FileContext.getFileContext(uri, conf);
-
- /**
- * The staging directory defines the path of staging folder.
- * USER_DEFINED_STATIC_STAGING_DIR_FLAG shall be set to true when user
wants to specify the staging folder and the directory can be fetched through
USER_DEFINED_STATIC_STAGING_DIR property.
- * IS_DATASET_STAGING_DIR_USED when true creates the staging folder within
a dataset location for dataset copying.
- * Else system will calculate the staging directory automatically.
- */
if
(state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false))
{
this.stagingDir = new
Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
- } else if
((state.getPropAsBoolean(ConfigurationKeys.IS_DATASET_STAGING_DIR_USED,false)))
{
- String stgDir = state.getProp(DATASET_STAGING_DIR_PATH) +
STAGING_DIR_SUFFIX + "/" + state.getProp(ConfigurationKeys.JOB_NAME_KEY ) + "/"
+ state.getProp(ConfigurationKeys.JOB_ID_KEY);
- state.setProp(ConfigurationKeys.WRITER_STAGING_DIR,stgDir);
- this.stagingDir = this.writerAttemptIdOptional.isPresent() ?
WriterUtils.getWriterStagingDir(state, numBranches, branchId,
this.writerAttemptIdOptional.get())
- : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
} else {
this.stagingDir = this.writerAttemptIdOptional.isPresent() ?
WriterUtils.getWriterStagingDir(state, numBranches, branchId,
this.writerAttemptIdOptional.get())
: WriterUtils.getWriterStagingDir(state, numBranches, branchId);
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
index 377b2cf..7db7b07 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopySourceTest.java
@@ -16,13 +16,16 @@
*/
package org.apache.gobblin.data.management.copy;
-
+import com.google.common.io.Files;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
-
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
@@ -190,4 +193,40 @@ public class CopySourceTest {
Assert.assertEquals(fileSet.getTotalEntities(), 5);
Assert.assertEquals(fileSet.getTotalSizeInBytes(), 50);
}
+
+ @Test
+ public void testDefaultHiveDatasetShardTempPaths()
+ throws IOException, NoSuchMethodException, InvocationTargetException,
IllegalAccessException {
+ SourceState state = new SourceState();
+ Properties copyProperties = new Properties();
+ copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+ File tempDir = Files.createTempDir();
+ String tempDirRoot = tempDir.getPath();
+ tempDir.deleteOnExit();
+
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI, "file:///");
+ state.setProp("hive.dataset.whitelist", "testDB.table*"); // using a mock
class so the finder will always find 3 tables regardless of this setting
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target/dir");
+ state.setProp(DatasetUtils.DATASET_PROFILE_CLASS_KEY,
MockHiveDatasetFinder.class.getName());
+ state.setProp(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR, "true");
+ state.setProp("tempDirRoot", tempDirRoot);
+ state.setProp(CopyConfiguration.STORE_REJECTED_REQUESTS_KEY,
+
RequestAllocatorConfig.StoreRejectedRequestsConfig.ALL.name().toLowerCase());
+ state.setProp(ConfigurationKeys.JOB_NAME_KEY, "jobName");
+ state.setProp(ConfigurationKeys.JOB_ID_KEY, "jobId");
+ CopySource source = new CopySource();
+
+ List<WorkUnit> workunits = source.getWorkunits(state);
+ workunits = JobLauncherUtils.flattenWorkUnits(workunits);
+ Assert.assertEquals(workunits.size(), 6); // workunits are created for pre
and post publish steps
+
+ // workunits are not guaranteed to be created in any order, remove
duplicate paths
+ Set<String> datasetPaths = workunits.stream().map(w ->
w.getProp(ConfigurationKeys.DATASET_DESTINATION_PATH)).collect(
+ Collectors.toSet());
+
+ for (int i = 0; i < 3; i++) {
+ Assert.assertEquals(datasetPaths.contains(tempDirRoot +
"/targetPath/testDB/table" + i), true);
+ }
+ }
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/MockHiveDatasetFinder.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/MockHiveDatasetFinder.java
new file mode 100644
index 0000000..98d580c
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/MockHiveDatasetFinder.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.data.management.copy.hive.HiveTargetPathHelper;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+
+public class MockHiveDatasetFinder extends HiveDatasetFinder {
+
+ Path tempDirRoot;
+ SourceState state;
+
+ public MockHiveDatasetFinder(LocalFileSystem fs, Properties properties,
+ EventSubmitter eventSubmitter) throws IOException {
+ super(fs, properties, eventSubmitter);
+ this.tempDirRoot = new Path(properties.getProperty("tempDirRoot"));
+ }
+
+ public Collection<DbAndTable> getTables() throws IOException {
+ List<DbAndTable> tables = Lists.newArrayList();
+
+ for (int i = 0; i < 3; i++) {
+ tables.add(new DbAndTable("testDB", "table" + i));
+ }
+
+ return tables;
+ }
+
+ @Override
+ public Iterator<HiveDataset> getDatasetsIterator() throws IOException {
+
+ return new AbstractIterator<HiveDataset>() {
+ private Iterator<DbAndTable> tables = getTables().iterator();
+
+ @Override
+ protected HiveDataset computeNext() {
+ try {
+ while (this.tables.hasNext()) {
+ DbAndTable dbAndTable = this.tables.next();
+ File dbPath = new File(tempDirRoot + "/testPath/testDB/" +
dbAndTable.getTable());
+ fs.mkdirs(new Path(dbPath.getAbsolutePath()));
+ Properties hiveProperties = new Properties();
+
hiveProperties.setProperty(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR,
"true");
+
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_TOBE_REPLACED,
tempDirRoot + "/testPath");
+
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_REPLACEMENT,
tempDirRoot + "/targetPath");
+ Table table = new Table(Table.getEmptyTable(dbAndTable.getDb(),
dbAndTable.getTable()));
+ table.setDataLocation(new Path(dbPath.getPath()));
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new
Properties(), Optional.absent());
+ HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool,
table, hiveProperties);
+ return dataset;
+ }
+ } catch (IOException e) {
+ Throwables.propagate(e);
+ }
+ return endOfData();
+ }
+ };
+ }
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index fc35bc1..5303c40 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -34,6 +34,10 @@ import
org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import
org.apache.gobblin.data.management.copy.hive.HiveCopyEntityHelper.DeregisterFileDeleteMethod;
import org.apache.gobblin.hive.HiveRegProps;
import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
@@ -361,6 +365,73 @@ public class HiveCopyEntityHelperTest {
Assert.assertFalse(meta_table.getSd().getSerdeInfo().getParameters().containsKey("path"));
}
+ @Test
+ public void testGetTargetLocationDefault() throws Exception {
+
+ Properties copyProperties = new Properties();
+ copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+ Path testPath = new Path("/testPath");
+ Properties hiveProperties = new Properties();
+ Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+ table.setDataLocation(testPath);
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new
Properties(), Optional.absent());
+ HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, table,
hiveProperties);
+
+ HiveCopyEntityHelper helper = new HiveCopyEntityHelper(dataset,
+ CopyConfiguration.builder(FileSystem.getLocal(new Configuration()),
copyProperties).build(),
+ new LocalFileSystem()
+ );
+
+ FileSystem fs = new LocalFileSystem();
+ // test that by default, the input path is the same as the output path
+ Path path = helper.getTargetLocation(fs, testPath,
Optional.<Partition>absent());
+ Assert.assertEquals(testPath.toUri().getRawPath(),
path.toUri().getRawPath());
+ }
+
+ @Test
+ public void testSetsDatasetShardPath() throws Exception {
+
+ Properties copyProperties = new Properties();
+ copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+ Path testPath = new Path("/testPath/db/table");
+ Properties hiveProperties = new Properties();
+ hiveProperties.setProperty(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR,
"true");
+ Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+ table.setDataLocation(testPath);
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new
Properties(), Optional.absent());
+ HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, table,
hiveProperties);
+
+ HiveCopyEntityHelper helper = new HiveCopyEntityHelper(dataset,
+ CopyConfiguration.builder(FileSystem.getLocal(new Configuration()),
copyProperties).build(),
+ new LocalFileSystem()
+ );
+
+ Assert.assertEquals(helper.getDataset().getDatasetPath(),
"/testPath/db/table");
+ }
+
+ @Test
+ public void testSetsDatasetShardPathWithReplacement() throws Exception {
+
+ Properties copyProperties = new Properties();
+ copyProperties.put(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/target");
+ Path testPath = new Path("/testPath/db/table");
+ Properties hiveProperties = new Properties();
+ hiveProperties.setProperty(ConfigurationKeys.USE_DATASET_LOCAL_WORK_DIR,
"true");
+
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_TOBE_REPLACED,
"/testPath");
+
hiveProperties.setProperty(HiveTargetPathHelper.COPY_TARGET_TABLE_PREFIX_REPLACEMENT,
"/targetPath");
+ Table table = new Table(Table.getEmptyTable("testDB", "testTable"));
+ table.setDataLocation(testPath);
+ HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(new
Properties(), Optional.absent());
+ HiveDataset dataset = new HiveDataset(new LocalFileSystem(), pool, table,
hiveProperties);
+
+ HiveCopyEntityHelper helper = new HiveCopyEntityHelper(dataset,
+ CopyConfiguration.builder(FileSystem.getLocal(new Configuration()),
copyProperties).build(),
+ new LocalFileSystem()
+ );
+
+ Assert.assertEquals(helper.getDataset().getDatasetPath(),
"/targetPath/db/table");
+ }
+
private boolean containsPath(Collection<FileStatus> statuses, Path path) {
for (FileStatus status : statuses) {
if (status.getPath().equals(path)) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
index 498b244..4e5e8e5 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisherTest.java
@@ -236,11 +236,12 @@ public class CopyDataPublisherTest {
private Path targetPath;
private FileSystem fs;
private CopyEntity copyEntity;
+ State state;
private void createDatasetFiles() throws IOException {
// Create writer output files
Path datasetWriterOutputPath =
- new Path(writerOutputPath,
copyEntity.getDatasetAndPartition(this.metadata).identifier());
+ new Path(writerOutputPath + "/" +
state.getProp(ConfigurationKeys.JOB_ID_KEY),
copyEntity.getDatasetAndPartition(this.metadata).identifier());
Path outputPathWithCurrentDirectory = new Path(datasetWriterOutputPath,
PathUtils.withoutLeadingSeparator(this.targetPath));
for (String path : relativeFilePaths) {
@@ -259,7 +260,7 @@ public class CopyDataPublisherTest {
this.metadata = new CopyableDatasetMetadata(this.copyableDataset);
this.relativeFilePaths = relativeFilePaths;
this.writerOutputPath = new
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR));
-
+ this.state = state;
this.targetPath = new Path(testMethodTempPath, datasetTargetPath);
FileStatus file = new FileStatus(0, false, 0, 0, 0, new Path("/file"));
@@ -277,6 +278,7 @@ public class CopyDataPublisherTest {
List<WorkUnitState> workUnitStates =
Lists.newArrayList(new WorkUnitState(), new WorkUnitState(), new
WorkUnitState());
for (WorkUnitState wus : workUnitStates) {
+ wus.addAll(this.state); // propagate job state into work unit state,
this is always done in copysource workunit generation
CopySource.serializeCopyableDataset(wus, metadata);
CopySource.serializeCopyEntity(wus, this.copyEntity);
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
index 3b99f9a..02ce0bb 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/publisher/DeletingCopyDataPublisherTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.gobblin.data.management.copy.publisher;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.configuration.WorkUnitState.WorkingState;
@@ -72,7 +73,10 @@ public class DeletingCopyDataPublisherTest {
CopySource.serializeCopyEntity(wus, cf);
Assert.assertTrue(fs.exists(new Path(testMethodTempPath, "test.txt")));
-
+ // these 2 properties should already be set before the publisher is called
+ wus.setProp(ConfigurationKeys.WRITER_STAGING_DIR, testMethodTempPath + "/"
+ ConfigurationKeys.STAGING_DIR_DEFAULT_SUFFIX);
+ wus.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, testMethodTempPath +
"/task-output");
+ wus.setProp(ConfigurationKeys.JOB_ID_KEY, "jobid");
wus.setWorkingState(WorkingState.SUCCESSFUL);
copyDataPublisher.publishData(ImmutableList.of(wus));
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index 7edd645..0856415 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
+import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
@@ -438,6 +439,10 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
return;
}
+ // Perform work needed before writing is done
+ Boolean canCleanUp =
this.canCleanStagingData(this.jobContext.getJobState());
+ closer.register(new DestinationDatasetHandlerService(jobState,
canCleanUp, this.eventSubmitter))
+ .executeHandlers(workUnitStream);
//Initialize writer and converter(s)
closer.register(WriterInitializerFactory.newInstace(jobState,
workUnitStream)).initialize();
closer.register(ConverterInitializerFactory.newInstance(jobState,
workUnitStream)).initialize();
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
index 45eb82d..93ea4a2 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/JobLauncherUtils.java
@@ -132,10 +132,9 @@ public class JobLauncherUtils {
* @param logger a {@link Logger} used for logging
*/
public static void cleanJobStagingData(State state, Logger logger) throws
IOException {
-
Preconditions.checkArgument(state.contains(ConfigurationKeys.WRITER_STAGING_DIR),
- "Missing required property " + ConfigurationKeys.WRITER_STAGING_DIR);
-
Preconditions.checkArgument(state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR),
- "Missing required property " + ConfigurationKeys.WRITER_OUTPUT_DIR);
+ if (!state.contains(ConfigurationKeys.WRITER_STAGING_DIR) ||
!state.contains(ConfigurationKeys.WRITER_OUTPUT_DIR)) {
+ return;
+ }
String writerFsUri =
state.getProp(ConfigurationKeys.WRITER_FILE_SYSTEM_URI,
ConfigurationKeys.LOCAL_FS_URI);
FileSystem fs = getFsWithProxy(state, writerFsUri,
WriterUtils.getFsConfiguration(state));
@@ -267,7 +266,7 @@ public class JobLauncherUtils {
* @return
* @throws IOException
*/
- private static FileSystem getFsWithProxy(final State state, final String
fsUri, final Configuration conf) throws IOException {
+ public static FileSystem getFsWithProxy(final State state, final String
fsUri, final Configuration conf) throws IOException {
if (!state.getPropAsBoolean(ConfigurationKeys.SHOULD_FS_PROXY_AS_USER,
ConfigurationKeys.DEFAULT_SHOULD_FS_PROXY_AS_USER)) {
return FileSystem.get(URI.create(fsUri), conf);