This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 91a2d1c  [GOBBLIN-1602] Change hive table location and partition check 
to validate using FS r… (#3459)
91a2d1c is described below

commit 91a2d1cc679a8285f7b05f76d16c17eb20206525
Author: William Lo <[email protected]>
AuthorDate: Thu Feb 17 10:42:26 2022 -0800

    [GOBBLIN-1602] Change hive table location and partition check to validate 
using FS r… (#3459)
    
    * Change hive table location and partition check to validate using FS 
resolvePath to resolve logical paths
    
    * Add tests for Unpartitioned file set
    
    * Address review, add additional throw if locations mismatch for partition 
location validation
    
    * Fix checkstyles again
    
    * allow partial success policy for workunits
---
 .../management/copy/hive/HiveCopyEntityHelper.java |  6 ++--
 .../management/copy/hive/HivePartitionFileSet.java | 20 +++++++----
 .../data/management/copy/hive/HiveUtils.java       | 17 +++++++++
 .../copy/hive/UnpartitionedTableFileSet.java       |  6 ++--
 .../copy/hive/UnpartitionedTableFileSetTest.java   | 40 ++++++++++++++++++++++
 5 files changed, 78 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 1403e20..a1ce36c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -750,9 +750,9 @@ public class HiveCopyEntityHelper {
 
   private void checkPartitionedTableCompatibility(Table desiredTargetTable, 
Table existingTargetTable)
       throws IOException {
-    if 
(!desiredTargetTable.getDataLocation().equals(existingTargetTable.getDataLocation()))
 {
-      throw new 
HiveTableLocationNotMatchException(desiredTargetTable.getDataLocation(),
-          existingTargetTable.getDataLocation());
+
+    if (HiveUtils.areTablePathsEquivalent(this.targetFs, 
desiredTargetTable.getDataLocation(), existingTargetTable.getDataLocation())) {
+      throw new 
HiveTableLocationNotMatchException(desiredTargetTable.getDataLocation(), 
existingTargetTable.getDataLocation());
     }
 
     if (desiredTargetTable.isPartitioned() != 
existingTargetTable.isPartitioned()) {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
index 8f1f208..62ead53 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HivePartitionFileSet.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Properties;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
@@ -40,6 +41,8 @@ import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.spec.SimpleHiveSpec;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.MultiTimingEvent;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -99,7 +102,13 @@ public class HivePartitionFileSet extends HiveFileSet {
               hiveCopyEntityHelper.getExistingEntityPolicy() != 
HiveCopyEntityHelper.ExistingEntityPolicy.REPLACE_TABLE_AND_PARTITIONS) {
             log.error("Source and target partitions are not compatible. 
Aborting copy of partition " + this.partition,
                 ioe);
-            return Lists.newArrayList();
+            // Silence error and continue processing workunits if we allow 
partial success
+            if 
(ConfigUtils.getString(hiveCopyEntityHelper.getConfiguration().getConfig(), 
ConfigurationKeys.JOB_COMMIT_POLICY_KEY,
+                
JobCommitPolicy.COMMIT_ON_FULL_SUCCESS.toString()).equals(JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS.toString()))
 {
+              return Lists.newArrayList();
+            } else {
+              throw ioe;
+            }
           }
           log.warn("Source and target partitions are not compatible. Will 
override target partition: " + ioe.getMessage());
           log.debug("Incompatibility details: ", ioe);
@@ -194,12 +203,11 @@ public class HivePartitionFileSet extends HiveFileSet {
     }
   }
 
-  private static void checkPartitionCompatibility(Partition 
desiredTargetPartition, Partition existingTargetPartition)
+  private void checkPartitionCompatibility(Partition desiredTargetPartition, 
Partition existingTargetPartition)
       throws IOException {
-    if 
(!desiredTargetPartition.getDataLocation().equals(existingTargetPartition.getDataLocation()))
 {
-      throw new IOException(
-          String.format("Desired target location %s and already registered 
target location %s do not agree.",
-              desiredTargetPartition.getDataLocation(), 
existingTargetPartition.getDataLocation()));
+    if (!HiveUtils.areTablePathsEquivalent(hiveCopyEntityHelper.getTargetFs(), 
desiredTargetPartition.getDataLocation(),
+        existingTargetPartition.getDataLocation())) {
+      throw new 
HiveTableLocationNotMatchException(desiredTargetPartition.getDataLocation(), 
existingTargetPartition.getDataLocation());
     }
   }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
index a982e8a..0bc57b4 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.data.management.copy.hive;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -24,6 +25,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -167,4 +169,19 @@ public class HiveUtils {
   public static boolean isPartitioned(Table table) {
     return table.isPartitioned();
   }
+
+  /**
+   * @param fs User configured filesystem of the target table
+   * @param userSpecifiedPath user specified path of the copy table location 
or partition
+   * @param existingTablePath path of an already registered Hive table or 
partition
+   * @return true if the filesystem resolves them to be equivalent, false 
otherwise
+   */
+  public static boolean areTablePathsEquivalent(FileSystem fs, Path 
userSpecifiedPath, Path existingTablePath) throws IOException {
+    try {
+      return 
fs.resolvePath(existingTablePath).equals(fs.resolvePath(userSpecifiedPath));
+    } catch (FileNotFoundException e) {
+      // The userSpecifiedPath must not exist here, so the paths are not equal
+      return false;
+    }
+  }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
index 86cc490..527fd12 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSet.java
@@ -64,11 +64,13 @@ public class UnpartitionedTableFileSet extends HiveFileSet {
 
     Optional<Table> existingTargetTable = this.helper.getExistingTargetTable();
     if (existingTargetTable.isPresent()) {
-      if 
(!this.helper.getTargetTable().getDataLocation().equals(existingTargetTable.get().getDataLocation()))
 {
+      // Use update policy if user defined table path for their copy location 
does not match pre-existing table path
+      if (!HiveUtils.areTablePathsEquivalent(this.helper.getTargetFs(), 
this.helper.getTargetTable().getDataLocation(),
+          existingTargetTable.get().getDataLocation())) {
         switch (this.helper.getExistingEntityPolicy()){
           case UPDATE_TABLE:
             // Update the location of files while keep the existing table 
entity.
-            log.warn("Source table will not be deregistered while file 
locaiton has been changed, update source table's"
+            log.warn("Source table will not be deregistered while file 
location has been changed, update source table's"
                 + " file location to" + 
this.helper.getTargetTable().getDataLocation());
             existingTargetTable = Optional.absent();
             break ;
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
index b3102e4..28ab815 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/UnpartitionedTableFileSetTest.java
@@ -18,13 +18,17 @@
 package org.apache.gobblin.data.management.copy.hive;
 
 import com.google.common.base.Optional;
+import com.google.common.base.Predicates;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.junit.Assert;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
+import static org.mockito.AdditionalAnswers.returnsFirstArg;
 
 import java.util.List;
 
@@ -44,11 +48,47 @@ public class UnpartitionedTableFileSetTest {
         Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
         
Mockito.when(helper.getExistingTargetTable()).thenReturn(Optional.of(existingTargetTable));
         Mockito.when(helper.getTargetTable()).thenReturn(table);
+        // Mock filesystem resolver
+        FileSystem mockFS = Mockito.mock(FileSystem.class);
+        Mockito.when(helper.getTargetFs()).thenReturn(mockFS);
+        
Mockito.when(mockFS.resolvePath(Mockito.any())).then(returnsFirstArg());
+
+        
Mockito.when(helper.getExistingEntityPolicy()).thenReturn(HiveCopyEntityHelper.ExistingEntityPolicy.ABORT);
+        MetricContext metricContext = 
MetricContext.builder("testUnpartitionedTableFileSet").build();
+        EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(metricContext,"loc.nomatch.exp").build();
+        Mockito.when(helper.getEventSubmitter()).thenReturn(eventSubmitter);
+        UnpartitionedTableFileSet upts = new 
UnpartitionedTableFileSet("testLocationMatch",hiveDataset,helper);
+        List<CopyEntity> copyEntities = 
(List<CopyEntity>)upts.generateCopyEntities();
+    }
+
+    @Test
+    public void testHiveTableLocationMatchDifferentPathsResolved() throws 
Exception {
+        Path testPath = new Path("/testPath/db/table");
+        Path existingTablePath = new Path("/existing/testPath/db/table");
+        Table table = new Table("testDb","table1");
+        table.setDataLocation(testPath);
+        Table existingTargetTable = new Table("testDb","table1");
+        existingTargetTable.setDataLocation(existingTablePath);
+        HiveDataset hiveDataset = Mockito.mock(HiveDataset.class);
+        Mockito.when(hiveDataset.getTable()).thenReturn(table);
+        HiveCopyEntityHelper helper = Mockito.mock(HiveCopyEntityHelper.class);
+        Mockito.when(helper.getDataset()).thenReturn(hiveDataset);
+        
Mockito.when(helper.getExistingTargetTable()).thenReturn(Optional.of(existingTargetTable));
+        Mockito.when(helper.getTargetTable()).thenReturn(table);
+        // Only test that the files will be empty and hive will mark that the 
paths are equivalent, shortcircuit out
+        
Mockito.when(helper.getFastTableSkip()).thenReturn(Optional.of(Predicates.alwaysTrue()));
+        // Mock filesystem resolver
+        FileSystem mockFS = Mockito.mock(FileSystem.class);
+        Mockito.when(helper.getTargetFs()).thenReturn(mockFS);
+        Mockito.when(mockFS.resolvePath(Mockito.any())).thenReturn(new 
Path("hdfs://testPath/db/table"));
+
         
Mockito.when(helper.getExistingEntityPolicy()).thenReturn(HiveCopyEntityHelper.ExistingEntityPolicy.ABORT);
         MetricContext metricContext = 
MetricContext.builder("testUnpartitionedTableFileSet").build();
         EventSubmitter eventSubmitter = new 
EventSubmitter.Builder(metricContext,"loc.nomatch.exp").build();
         Mockito.when(helper.getEventSubmitter()).thenReturn(eventSubmitter);
         UnpartitionedTableFileSet upts = new 
UnpartitionedTableFileSet("testLocationMatch",hiveDataset,helper);
         List<CopyEntity> copyEntities = 
(List<CopyEntity>)upts.generateCopyEntities();
+        // Size should be 0 since fast table skip predicate is always true
+        Assert.assertEquals(copyEntities.size(), 0);
     }
 }

Reply via email to