This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ea1964a [GOBBLIN-1206] Only populate path to dest-table if src-table
has it as storageParam
ea1964a is described below
commit ea1964a1f8dbb7369b23da7771789d6a0f7098e0
Author: Lei Sun <[email protected]>
AuthorDate: Thu Jun 25 21:37:44 2020 -0700
[GOBBLIN-1206] Only populate path to dest-table if src-table has it as
storageParam
Closes #3051 from autumnust/hivePathParam
---
.../management/copy/hive/HiveCopyEntityHelper.java | 35 ++++++++++++++--------
.../management/copy/hive/HivePartitionFileSet.java | 6 +++-
.../copy/hive/HiveCopyEntityHelperTest.java | 18 +++++++++++
.../org/apache/gobblin/hive/HiveConstants.java | 1 +
4 files changed, 46 insertions(+), 14 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 7fa22e0..444d193 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -478,19 +478,7 @@ public class HiveCopyEntityHelper {
private Table getTargetTable(Table originTable, Path targetLocation) throws
IOException {
try {
Table targetTable = originTable.copy();
-
- targetTable.setDbName(this.targetDatabase);
- targetTable.setDataLocation(targetLocation);
- /*
- * Need to set the table owner as the flow executor
- */
-
targetTable.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
- targetTable.getTTable().putToParameters(HiveDataset.REGISTERER,
GOBBLIN_DISTCP);
-
targetTable.getTTable().putToParameters(HiveDataset.REGISTRATION_GENERATION_TIME_MILLIS,
- Long.toString(this.startTime));
-
targetTable.getTTable().getSd().getSerdeInfo().getParameters().put(HiveConstants.PATH,
targetLocation.toString());
- targetTable.getTTable().unsetCreateTime();
-
+ HiveCopyEntityHelper.addMetadataToTargetTable(targetTable,
targetLocation, this.targetDatabase, this.startTime);
HiveAvroCopyEntityHelper.updateTableAttributesIfAvro(targetTable, this);
return targetTable;
} catch (HiveException he) {
@@ -498,6 +486,27 @@ public class HiveCopyEntityHelper {
}
}
+ @VisibleForTesting
+ static void addMetadataToTargetTable(Table targetTable, Path targetLocation,
String targetDatabase, long startTime)
+ throws IOException {
+ targetTable.setDbName(targetDatabase);
+ targetTable.setDataLocation(targetLocation);
+ /*
+ * Need to set the table owner as the flow executor
+ */
+
targetTable.setOwner(UserGroupInformation.getCurrentUser().getShortUserName());
+ targetTable.getTTable().putToParameters(HiveDataset.REGISTERER,
GOBBLIN_DISTCP);
+
targetTable.getTTable().putToParameters(HiveDataset.REGISTRATION_GENERATION_TIME_MILLIS,
+ Long.toString(startTime));
+
+ /**
+ * Only set the this constants when source table has it.
+ */
+ targetTable.getTTable().getSd().getSerdeInfo().getParameters()
+ .computeIfPresent(HiveConstants.PATH, (k,v) ->
targetLocation.toString());
+ targetTable.getTTable().unsetCreateTime();
+ }
+
int addPartitionDeregisterSteps(List<CopyEntity> copyEntities, String
fileSet, int initialPriority,
Table table, Partition partition) throws IOException {
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 7a90b59..efa4e4c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -186,7 +186,11 @@ public class HivePartitionFileSet extends HiveFileSet {
targetPartition.getTPartition().putToParameters(HiveDataset.REGISTRATION_GENERATION_TIME_MILLIS,
Long.toString(this.hiveCopyEntityHelper.getStartTime()));
targetPartition.setLocation(targetLocation.toString());
-
targetPartition.getTPartition().getSd().getSerdeInfo().getParameters().put(HiveConstants.PATH,
targetLocation.toString());
+ /**
+ * Only set the this constants when source partition has it.
+ */
+ targetPartition.getTPartition().getSd().getSerdeInfo().getParameters()
+ .computeIfPresent(HiveConstants.PATH, (k,v) ->
targetLocation.toString());
targetPartition.getTPartition().unsetCreateTime();
return targetPartition;
} catch (HiveException he) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
index 91a2e67..30b5e17 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelperTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.data.management.copy.hive;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -347,6 +348,23 @@ public class HiveCopyEntityHelperTest {
Assert.assertEquals(HiveCopyEntityHelper.replacedPrefix(sourcePath,
prefixTobeReplaced, prefixReplacement), expected);
}
+ @Test
+ public void testAddMetadataToTargetTable() throws Exception {
+ org.apache.hadoop.hive.ql.metadata.Table meta_table =
+ new Table(Table.getEmptyTable("testDB", "testTable"));
+
+ Map<String, String> storageParams = new HashMap<>();
+ storageParams.put("path", "randomPath");
+ meta_table.getSd().getSerdeInfo().setParameters(storageParams);
+ HiveCopyEntityHelper.addMetadataToTargetTable(meta_table, new
Path("newPath"), "testDB", 10L);
+
Assert.assertEquals(meta_table.getSd().getSerdeInfo().getParameters().get("path"),
"newPath");
+
+ storageParams.clear();
+ meta_table.getSd().getSerdeInfo().setParameters(storageParams);
+ HiveCopyEntityHelper.addMetadataToTargetTable(meta_table, new
Path("newPath"), "testDB", 10L);
+
Assert.assertFalse(meta_table.getSd().getSerdeInfo().getParameters().containsKey("path"));
+ }
+
private boolean containsPath(Collection<FileStatus> statuses, Path path) {
for (FileStatus status : statuses) {
if (status.getPath().equals(path)) {
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConstants.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConstants.java
index 3444cc8..9ec44e5 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConstants.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveConstants.java
@@ -47,6 +47,7 @@ public class HiveConstants {
* Storage properties
*/
public static final String LOCATION = "location";
+ // A storage parameter that is managed by Spark for Spark Datasource tables
public static final String PATH = "path";
public static final String INPUT_FORMAT = "input.format";
public static final String OUTPUT_FORMAT = "output.format";