This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4107ae6 [GOBBLIN-1574] Added whitelist for iceberg tables to add new
partitio… (#3426)
4107ae6 is described below
commit 4107ae649d8314ddc17a47b30110194ab9653dec
Author: vbohra <[email protected]>
AuthorDate: Thu Nov 18 15:54:14 2021 -0800
[GOBBLIN-1574] Added whitelist for iceberg tables to add new partitio…
(#3426)
* [GOBBLIN-1574] Added whitelist for iceberg tables to add new partition
column
* fix to failing test case
* Updated IncebergMetadataWriterTest to blacklist the test table from
non-completeness tests
* moved dataset name update in tablemetadata
* Added newPartition checks in Table Metadata
* Fixed test case to include new_parition_enabled
Co-authored-by: Vikram Bohra <[email protected]>
---
.../iceberg/writer/IcebergMetadataWriter.java | 36 ++++++++++++++--------
.../writer/IcebergMetadataWriterConfigKeys.java | 5 +++
.../iceberg/writer/IcebergMetadataWriterTest.java | 1 +
3 files changed, 29 insertions(+), 13 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index db01793..df5739e 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -165,6 +165,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private final DateTimeFormatter HOURLY_DATEPARTITION_FORMAT;
private final String newPartitionColumn;
private final String newPartitionColumnType;
+ private final boolean newPartitionEnabled;
+ private final WhitelistBlacklist newPartitionTableWhitelistBlacklist;
private Optional<KafkaAuditCountVerifier> auditCountVerifier;
private final String auditCheckGranularity;
@@ -227,6 +229,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
this.auditCountVerifier = Optional.fromNullable(this.completenessEnabled ?
new KafkaAuditCountVerifier(state) : null);
this.newPartitionColumn = state.getProp(NEW_PARTITION_KEY,
DEFAULT_NEW_PARTITION);
this.newPartitionColumnType = state.getProp(NEW_PARTITION_TYPE_KEY,
DEFAULT_PARTITION_COLUMN_TYPE);
+ this.newPartitionEnabled =
state.getPropAsBoolean(ICEBERG_NEW_PARTITION_ENABLED,
DEFAULT_ICEBERG_NEW_PARTITION_ENABLED);
+ this.newPartitionTableWhitelistBlacklist = new
WhitelistBlacklist(state.getProp(ICEBERG_NEW_PARTITION_WHITELIST, ""),
+ state.getProp(ICEBERG_NEW_PARTITION_BLACKLIST, ""));
this.auditCheckGranularity = state.getProp(AUDIT_CHECK_GRANULARITY,
DEFAULT_AUDIT_CHECK_GRANULARITY);
}
@@ -475,6 +480,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if(!table.spec().fields().stream().anyMatch(x ->
x.name().equalsIgnoreCase(fieldName))) {
table.updateSpec().addField(fieldName).commit();
}
+ table.refresh();
return table;
}
@@ -684,7 +690,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new
Path(file.getFilePath()).getParent().toString());
StructLike partition = getIcebergPartitionVal(hiveSpecs,
file.getFilePath(), partitionSpec);
- if(tableMetadata.completenessEnabled && gmce.getOperationType() ==
OperationType.add_files) {
+ if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType()
== OperationType.add_files) {
tableMetadata.prevCompletenessWatermark =
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
// Assumes first partition value to be partitioned by date
@@ -705,6 +711,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
/**
* 1. Add "late" partition column to iceberg table if not exists
* 2. compute "late" partition value based on datepartition and completion
watermark
+ * 3. Default to late=0 if completion watermark check is disabled
* @param table
* @param tableMetadata
* @param hivePartition
@@ -713,10 +720,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
*/
private StructLike addLatePartitionValueToIcebergTable(Table table,
TableMetadata tableMetadata, HivePartition hivePartition, String datepartition)
{
table = addPartitionToIcebergTable(table, newPartitionColumn,
newPartitionColumnType);
- table.refresh();
PartitionSpec partitionSpec = table.spec();
long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
- int late = isLate(datepartition, prevCompletenessWatermark);
+ int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition,
prevCompletenessWatermark);
List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
partitionValues.add(String.valueOf(late));
return IcebergUtils.getPartition(partitionSpec.partitionType(),
partitionValues);
@@ -785,7 +791,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
- if(tableMetadata.completenessEnabled) {
+ if (tableMetadata.completenessEnabled) {
String topicName = props.get(TOPIC_NAME_KEY);
if(topicName == null) {
log.error(String.format("Not performing audit check. %s is null.
Please set as table property of %s.%s",
@@ -1011,8 +1017,10 @@ public class IcebergMetadataWriter implements
MetadataWriter {
GenericRecord genericRecord = recordEnvelope.getRecord();
GobblinMetadataChangeEvent gmce =
(GobblinMetadataChangeEvent)
SpecificData.get().deepCopy(genericRecord.getSchema(), genericRecord);
- if (whitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
- TableIdentifier tid =
TableIdentifier.of(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName());
+ String dbName = tableSpec.getTable().getDbName();
+ String tableName = tableSpec.getTable().getTableName();
+ if (whitelistBlacklist.acceptTable(dbName, tableName)) {
+ TableIdentifier tid = TableIdentifier.of(dbName, tableName);
String topicPartition = tableTopicPartitionMap.computeIfAbsent(tid,
t -> recordEnvelope.getWatermark().getSource());
Long currentWatermark = getAndPersistCurrentWatermark(tid,
topicPartition);
@@ -1021,12 +1029,14 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if (currentOffset > currentWatermark) {
if (!tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark.isPresent()) {
//This means we haven't register this table or met some error
before, we need to reset the low watermark
- tableMetadataMap.computeIfAbsent(tid, t -> new
TableMetadata()).lowWatermark =
- Optional.of(currentOffset - 1);
- }
-
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
- if(this.completenessEnabled &&
this.completenessWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
- tableMetadataMap.get(tid).setCompletenessEnabled(true);
+ tableMetadataMap.get(tid).lowWatermark = Optional.of(currentOffset
- 1);
+
tableMetadataMap.get(tid).setDatasetName(gmce.getDatasetIdentifier().getNativeName());
+ if (this.newPartitionEnabled &&
this.newPartitionTableWhitelistBlacklist.acceptTable(dbName, tableName)) {
+ tableMetadataMap.get(tid).newPartitionColumnEnabled = true;
+ if (this.completenessEnabled &&
this.completenessWhitelistBlacklist.acceptTable(dbName, tableName)) {
+ tableMetadataMap.get(tid).completenessEnabled = true;
+ }
+ }
}
write(gmce, newSpecsMap, oldSpecsMap, tableSpec);
@@ -1080,8 +1090,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
@Setter
String datasetName;
- @Setter
boolean completenessEnabled;
+ boolean newPartitionColumnEnabled;
Cache<CharSequence, String> addedFiles = CacheBuilder.newBuilder()
.expireAfterAccess(conf.getInt(ADDED_FILES_CACHE_EXPIRING_TIME,
DEFAULT_ADDED_FILES_CACHE_EXPIRING_TIME),
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index 65c4e57..c846270 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -36,5 +36,10 @@ public class IcebergMetadataWriterConfigKeys {
public static final String TOPIC_NAME_KEY = "topic.name";
public static final String AUDIT_CHECK_GRANULARITY =
"iceberg.completeness.audit.check.granularity";
public static final String DEFAULT_AUDIT_CHECK_GRANULARITY = "HOUR";
+ public static final String ICEBERG_NEW_PARTITION_ENABLED =
"iceberg.new.partition.enabled";
+ public static final boolean DEFAULT_ICEBERG_NEW_PARTITION_ENABLED = false;
+ public static final String ICEBERG_NEW_PARTITION_WHITELIST =
"iceberg.new.partition.whitelist";
+ public static final String ICEBERG_NEW_PARTITION_BLACKLIST =
"iceberg.new.partition.blacklist";
+
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index a1d9b18..44c5514 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -179,6 +179,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
private State getStateWithCompletenessConfig() {
State state = getState();
+ state.setProp(ICEBERG_NEW_PARTITION_ENABLED, true);
state.setProp(ICEBERG_COMPLETENESS_ENABLED, true);
state.setProp(NEW_PARTITION_KEY, "late");
state.setProp(NEW_PARTITION_TYPE_KEY, "int");