This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new edc2b3a  [GOBBLIN-785] remove wrapper isPartition function, use 
table.isPartitioned instead
edc2b3a is described below

commit edc2b3a5b4073d9a0e50abea609a27d082ddbb9d
Author: Jay Sen <[email protected]>
AuthorDate: Tue Oct 1 15:46:59 2019 -0700

    [GOBBLIN-785] remove wrapper isPartition function, use table.isPartitioned 
instead
    
    Closes #2650 from jhsenjaliya/GOBBLIN-785
---
 .../conversion/hive/source/HiveSource.java           |  2 +-
 .../conversion/hive/task/HiveConverterUtils.java     |  2 +-
 .../hive/watermarker/PartitionLevelWatermarker.java  |  4 ++--
 .../management/copy/hive/HiveCopyEntityHelper.java   | 20 ++++++++++----------
 .../gobblin/data/management/copy/hive/HiveUtils.java |  3 ++-
 .../finder/AbstractHiveDatasetVersionFinder.java     |  2 +-
 .../conversion/hive/validation/ValidationJob.java    |  2 +-
 7 files changed, 18 insertions(+), 17 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 3ad99fd..a0b9125 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -169,7 +169,7 @@ public class HiveSource implements Source {
           log.debug(String.format("Processing dataset: %s", hiveDataset));
 
           // Create workunits for partitions
-          if (HiveUtils.isPartitioned(hiveDataset.getTable())
+          if (hiveDataset.getTable().isPartitioned()
               && 
state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS,
               DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS)) {
             createWorkunitsForPartitionedTable(hiveDataset, client);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index a08f61a..bbad3a4 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -435,7 +435,7 @@ public class HiveConverterUtils {
         table = Optional.of(client.get().getTable(dbName, tableName));
         if (table.isPresent()) {
           org.apache.hadoop.hive.ql.metadata.Table qlTable = new 
org.apache.hadoop.hive.ql.metadata.Table(table.get());
-          if (HiveUtils.isPartitioned(qlTable)) {
+          if (qlTable.isPartitioned()) {
             partitions = Optional.of(HiveUtils.getPartitions(client.get(), 
qlTable, Optional.<String>absent()));
           }
         }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
index 5fd3f76..9b51f83 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
@@ -306,8 +306,8 @@ public class PartitionLevelWatermarker implements 
HiveSourceWatermarker {
 
         // Watermark workunits are required only for Partitioned tables
         // tableKey is table complete name in the format db@table
-        if (!HiveUtils.isPartitioned(new 
org.apache.hadoop.hive.ql.metadata.Table(client.get().getTable(
-            tableKey.split("@")[0], tableKey.split("@")[1])))) {
+        if (!(new 
org.apache.hadoop.hive.ql.metadata.Table(client.get().getTable(
+            tableKey.split("@")[0], tableKey.split("@")[1])).isPartitioned())) 
{
           continue;
         }
         // We only keep watermarks for partitions that were updated after 
leastWatermarkToPersistInState
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 019667d..84adc27 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -350,7 +350,7 @@ public class HiveCopyEntityHelper {
         }
 
         // Constructing CommitStep object for table registration
-        Path targetPath = getTargetLocation(dataset.fs, this.targetFs, 
dataset.table.getDataLocation(),
+        Path targetPath = getTargetLocation(this.dataset.fs, this.targetFs, 
this.dataset.table.getDataLocation(),
             Optional.<Partition> absent());
         this.targetTable = getTargetTable(this.dataset.table, targetPath);
         HiveSpec tableHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
@@ -363,7 +363,7 @@ public class HiveCopyEntityHelper {
         if (this.existingTargetTable.isPresent() && 
this.existingTargetTable.get().isPartitioned()) {
           checkPartitionedTableCompatibility(this.targetTable, 
this.existingTargetTable.get());
         }
-        if (HiveUtils.isPartitioned(this.dataset.table)) {
+        if (this.dataset.table.isPartitioned()) {
           this.sourcePartitions = 
HiveUtils.getPartitionsMap(multiClient.getClient(source_client), 
this.dataset.table,
               this.partitionFilter, this.hivePartitionExtendedFilter);
           
HiveAvroCopyEntityHelper.updatePartitionAttributesIfAvro(this.targetTable, 
this.sourcePartitions, this);
@@ -407,7 +407,7 @@ public class HiveCopyEntityHelper {
    */
   Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration 
configuration, Comparator<FileSet<CopyEntity>> prioritizer,
       PushDownRequestor<FileSet<CopyEntity>> requestor) throws IOException {
-    if (HiveUtils.isPartitioned(this.dataset.table)) {
+    if (this.dataset.table.isPartitioned()) {
       return new PartitionIterator(this.sourcePartitions, configuration, 
prioritizer, requestor);
     } else {
       FileSet<CopyEntity> fileSet = new 
UnpartitionedTableFileSet(this.dataset.table.getCompleteName(), this.dataset, 
this);
@@ -645,12 +645,12 @@ public class HiveCopyEntityHelper {
         // destination has higher version, skip the copy
         if (srcVer.compareTo(dstVer) <= 0) {
           if (!helper.isEnforceFileSizeMatch() || 
existingTargetStatus.getLen() == sourcePath.getLen()) {
-            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be 
skipped.",
-                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), 
dstVer);
+            log.debug("Copy from src {} (version:{}) to dst {} (version:{}) 
can be skipped since file size ({} bytes) is matching",
+                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), 
dstVer, sourcePath.getLen());
             shouldCopy = false;
           } else {
-            log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be 
skipped due to unmatched file length.",
-                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), 
dstVer);
+            log.debug("Copy from src {} (version:{}) to dst {} (version:{}) 
can not be skipped because the file size is not matching or it is enforced by 
this config: {}",
+                sourcePath.getPath(), srcVer, existingTargetStatus.getPath(), 
dstVer, CopyConfiguration.ENFORCE_FILE_LENGTH_MATCH);
           }
         } else {
           log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to 
a higher version.",
@@ -707,11 +707,11 @@ public class HiveCopyEntityHelper {
           existingTargetTable.getDataLocation());
     }
 
-    if (HiveUtils.isPartitioned(desiredTargetTable) != 
HiveUtils.isPartitioned(existingTargetTable)) {
+    if (desiredTargetTable.isPartitioned() != 
existingTargetTable.isPartitioned()) {
       throw new IOException(String.format(
           "%s: Desired target table %s partitioned, existing target table %s 
partitioned. Tables are incompatible.",
-          this.dataset.tableIdentifier, 
HiveUtils.isPartitioned(desiredTargetTable) ? "is" : "is not",
-          HiveUtils.isPartitioned(existingTargetTable) ? "is" : "is not"));
+          this.dataset.tableIdentifier, desiredTargetTable.isPartitioned() ? 
"is" : "is not",
+          existingTargetTable.isPartitioned() ? "is" : "is not"));
     }
     if (desiredTargetTable.isPartitioned()
         && 
!desiredTargetTable.getPartitionKeys().equals(existingTargetTable.getPartitionKeys()))
 {
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
index db56b4c..7f66e9d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
@@ -60,7 +60,7 @@ public class HiveUtils {
    * @param table the {@link Table} for which we should get partitions.
    * @param filter an optional filter for partitions as would be used in Hive. 
Can only filter on String columns.
    *               (e.g. "part = \"part1\"" or "date > \"2015\"".
-   * @return a map of values to {@link Partition} for input {@link Table}.
+   * @return a map of values to {@link Partition} for input {@link Table} 
filtered and non-nullified.
    */
   public static Map<List<String>, Partition> getPartitionsMap(IMetaStoreClient 
client, Table table,
       Optional<String> filter, Optional<? extends HivePartitionExtendedFilter> 
hivePartitionExtendedFilterOptional) throws IOException {
@@ -165,6 +165,7 @@ public class HiveUtils {
 
   /**
    * @return true if {@link Table} is partitioned.
+   * @deprecated use {@link Table}'s isPartitioned method directly.
    */
   public static boolean isPartitioned(Table table) {
     return table.isPartitioned();
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
index ce167b4..f122048 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
@@ -68,7 +68,7 @@ public abstract class AbstractHiveDatasetVersionFinder 
implements VersionFinder<
     }
     final HiveDataset hiveDataset = (HiveDataset) dataset;
 
-    if (!HiveUtils.isPartitioned(hiveDataset.getTable())) {
+    if (!hiveDataset.getTable().isPartitioned()) {
       throw new IllegalArgumentException("HiveDatasetVersionFinder is only 
compatible with partitioned hive tables");
     }
 
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
index 1a61803..57117cb 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
@@ -293,7 +293,7 @@ public class ValidationJob extends AbstractJob {
 
           // Validate dataset
           log.info(String.format("Validating dataset: %s", hiveDataset));
-          if (HiveUtils.isPartitioned(hiveDataset.getTable())) {
+          if (hiveDataset.getTable().isPartitioned()) {
             processPartitionedTable(hiveDataset, client);
           } else {
             processNonPartitionedTable(hiveDataset);

Reply via email to