This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new edc2b3a [GOBBLIN-785] remove wrapper isPartition function, use
table.isPartitioned instead
edc2b3a is described below
commit edc2b3a5b4073d9a0e50abea609a27d082ddbb9d
Author: Jay Sen <[email protected]>
AuthorDate: Tue Oct 1 15:46:59 2019 -0700
[GOBBLIN-785] remove wrapper isPartition function, use table.isPartitioned
instead
Closes #2650 from jhsenjaliya/GOBBLIN-785
---
.../conversion/hive/source/HiveSource.java | 2 +-
.../conversion/hive/task/HiveConverterUtils.java | 2 +-
.../hive/watermarker/PartitionLevelWatermarker.java | 4 ++--
.../management/copy/hive/HiveCopyEntityHelper.java | 20 ++++++++++----------
.../gobblin/data/management/copy/hive/HiveUtils.java | 3 ++-
.../finder/AbstractHiveDatasetVersionFinder.java | 2 +-
.../conversion/hive/validation/ValidationJob.java | 2 +-
7 files changed, 18 insertions(+), 17 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 3ad99fd..a0b9125 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -169,7 +169,7 @@ public class HiveSource implements Source {
log.debug(String.format("Processing dataset: %s", hiveDataset));
// Create workunits for partitions
- if (HiveUtils.isPartitioned(hiveDataset.getTable())
+ if (hiveDataset.getTable().isPartitioned()
&&
state.getPropAsBoolean(HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS,
DEFAULT_HIVE_SOURCE_CREATE_WORKUNITS_FOR_PARTITIONS)) {
createWorkunitsForPartitionedTable(hiveDataset, client);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
index a08f61a..bbad3a4 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -435,7 +435,7 @@ public class HiveConverterUtils {
table = Optional.of(client.get().getTable(dbName, tableName));
if (table.isPresent()) {
org.apache.hadoop.hive.ql.metadata.Table qlTable = new
org.apache.hadoop.hive.ql.metadata.Table(table.get());
- if (HiveUtils.isPartitioned(qlTable)) {
+ if (qlTable.isPartitioned()) {
partitions = Optional.of(HiveUtils.getPartitions(client.get(),
qlTable, Optional.<String>absent()));
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
index 5fd3f76..9b51f83 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.java
@@ -306,8 +306,8 @@ public class PartitionLevelWatermarker implements
HiveSourceWatermarker {
// Watermark workunits are required only for Partitioned tables
// tableKey is table complete name in the format db@table
- if (!HiveUtils.isPartitioned(new
org.apache.hadoop.hive.ql.metadata.Table(client.get().getTable(
- tableKey.split("@")[0], tableKey.split("@")[1])))) {
+ if (!(new
org.apache.hadoop.hive.ql.metadata.Table(client.get().getTable(
+ tableKey.split("@")[0], tableKey.split("@")[1])).isPartitioned()))
{
continue;
}
// We only keep watermarks for partitions that were updated after
leastWatermarkToPersistInState
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
index 019667d..84adc27 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveCopyEntityHelper.java
@@ -350,7 +350,7 @@ public class HiveCopyEntityHelper {
}
// Constructing CommitStep object for table registration
- Path targetPath = getTargetLocation(dataset.fs, this.targetFs,
dataset.table.getDataLocation(),
+ Path targetPath = getTargetLocation(this.dataset.fs, this.targetFs,
this.dataset.table.getDataLocation(),
Optional.<Partition> absent());
this.targetTable = getTargetTable(this.dataset.table, targetPath);
HiveSpec tableHiveSpec = new SimpleHiveSpec.Builder<>(targetPath)
@@ -363,7 +363,7 @@ public class HiveCopyEntityHelper {
if (this.existingTargetTable.isPresent() &&
this.existingTargetTable.get().isPartitioned()) {
checkPartitionedTableCompatibility(this.targetTable,
this.existingTargetTable.get());
}
- if (HiveUtils.isPartitioned(this.dataset.table)) {
+ if (this.dataset.table.isPartitioned()) {
this.sourcePartitions =
HiveUtils.getPartitionsMap(multiClient.getClient(source_client),
this.dataset.table,
this.partitionFilter, this.hivePartitionExtendedFilter);
HiveAvroCopyEntityHelper.updatePartitionAttributesIfAvro(this.targetTable,
this.sourcePartitions, this);
@@ -407,7 +407,7 @@ public class HiveCopyEntityHelper {
*/
Iterator<FileSet<CopyEntity>> getCopyEntities(CopyConfiguration
configuration, Comparator<FileSet<CopyEntity>> prioritizer,
PushDownRequestor<FileSet<CopyEntity>> requestor) throws IOException {
- if (HiveUtils.isPartitioned(this.dataset.table)) {
+ if (this.dataset.table.isPartitioned()) {
return new PartitionIterator(this.sourcePartitions, configuration,
prioritizer, requestor);
} else {
FileSet<CopyEntity> fileSet = new
UnpartitionedTableFileSet(this.dataset.table.getCompleteName(), this.dataset,
this);
@@ -645,12 +645,12 @@ public class HiveCopyEntityHelper {
// destination has higher version, skip the copy
if (srcVer.compareTo(dstVer) <= 0) {
if (!helper.isEnforceFileSizeMatch() ||
existingTargetStatus.getLen() == sourcePath.getLen()) {
- log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can be
skipped.",
- sourcePath.getPath(), srcVer, existingTargetStatus.getPath(),
dstVer);
+ log.debug("Copy from src {} (version:{}) to dst {} (version:{})
can be skipped since file size ({} bytes) is matching",
+ sourcePath.getPath(), srcVer, existingTargetStatus.getPath(),
dstVer, sourcePath.getLen());
shouldCopy = false;
} else {
- log.debug("Copy from src {} (v:{}) to dst {} (v:{}) can not be
skipped due to unmatched file length.",
- sourcePath.getPath(), srcVer, existingTargetStatus.getPath(),
dstVer);
+ log.debug("Copy from src {} (version:{}) to dst {} (version:{})
can not be skipped because the file size is not matching or it is enforced by
this config: {}",
+ sourcePath.getPath(), srcVer, existingTargetStatus.getPath(),
dstVer, CopyConfiguration.ENFORCE_FILE_LENGTH_MATCH);
}
} else {
log.debug("Copy from src {} (v:{}) to dst {} (v:{}) is needed due to
a higher version.",
@@ -707,11 +707,11 @@ public class HiveCopyEntityHelper {
existingTargetTable.getDataLocation());
}
- if (HiveUtils.isPartitioned(desiredTargetTable) !=
HiveUtils.isPartitioned(existingTargetTable)) {
+ if (desiredTargetTable.isPartitioned() !=
existingTargetTable.isPartitioned()) {
throw new IOException(String.format(
"%s: Desired target table %s partitioned, existing target table %s
partitioned. Tables are incompatible.",
- this.dataset.tableIdentifier,
HiveUtils.isPartitioned(desiredTargetTable) ? "is" : "is not",
- HiveUtils.isPartitioned(existingTargetTable) ? "is" : "is not"));
+ this.dataset.tableIdentifier, desiredTargetTable.isPartitioned() ?
"is" : "is not",
+ existingTargetTable.isPartitioned() ? "is" : "is not"));
}
if (desiredTargetTable.isPartitioned()
&&
!desiredTargetTable.getPartitionKeys().equals(existingTargetTable.getPartitionKeys()))
{
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
index db56b4c..7f66e9d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/HiveUtils.java
@@ -60,7 +60,7 @@ public class HiveUtils {
* @param table the {@link Table} for which we should get partitions.
* @param filter an optional filter for partitions as would be used in Hive.
Can only filter on String columns.
* (e.g. "part = \"part1\"" or "date > \"2015\"".
- * @return a map of values to {@link Partition} for input {@link Table}.
+ * @return a map of values to {@link Partition} for input {@link Table}
filtered and non-nullified.
*/
public static Map<List<String>, Partition> getPartitionsMap(IMetaStoreClient
client, Table table,
Optional<String> filter, Optional<? extends HivePartitionExtendedFilter>
hivePartitionExtendedFilterOptional) throws IOException {
@@ -165,6 +165,7 @@ public class HiveUtils {
/**
* @return true if {@link Table} is partitioned.
+ * @deprecated use {@link Table}'s isPartitioned method directly.
*/
public static boolean isPartitioned(Table table) {
return table.isPartitioned();
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
index ce167b4..f122048 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/version/finder/AbstractHiveDatasetVersionFinder.java
@@ -68,7 +68,7 @@ public abstract class AbstractHiveDatasetVersionFinder
implements VersionFinder<
}
final HiveDataset hiveDataset = (HiveDataset) dataset;
- if (!HiveUtils.isPartitioned(hiveDataset.getTable())) {
+ if (!hiveDataset.getTable().isPartitioned()) {
throw new IllegalArgumentException("HiveDatasetVersionFinder is only
compatible with partitioned hive tables");
}
diff --git
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
index 1a61803..57117cb 100644
---
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
+++
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
@@ -293,7 +293,7 @@ public class ValidationJob extends AbstractJob {
// Validate dataset
log.info(String.format("Validating dataset: %s", hiveDataset));
- if (HiveUtils.isPartitioned(hiveDataset.getTable())) {
+ if (hiveDataset.getTable().isPartitioned()) {
processPartitionedTable(hiveDataset, client);
} else {
processNonPartitionedTable(hiveDataset);