This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new eda83a259 [GOBBLIN-1810] Support general iceberg catalog in
icebergMetadataWriter (#3672)
eda83a259 is described below
commit eda83a25997d23bfe4266453ebdfe9bd5c641a40
Author: Zihan Li <[email protected]>
AuthorDate: Thu Apr 13 09:55:56 2023 -0700
[GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
(#3672)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
* Revert "[GOBBLIN-1810] Support general iceberg catalog in
icebergMetadataWriter"
This reverts commit b0844e8d7740b9eaa21132604f532a964c3f9e52.
* [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
* fix unit test
* add java doc
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../apache/gobblin/iceberg/GobblinMCEProducer.java | 9 +++
.../iceberg/writer/IcebergMetadataWriter.java | 81 +++++++++++++---------
.../iceberg/writer/IcebergMetadataWriterTest.java | 42 +++++------
3 files changed, 77 insertions(+), 55 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index b037cb635..0f0061ed2 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -46,6 +46,7 @@ import org.apache.gobblin.metadata.IntegerLongPair;
import org.apache.gobblin.metadata.OperationType;
import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.writer.PartitionedDataWriter;
@@ -149,6 +150,14 @@ public abstract class GobblinMCEProducer implements
Closeable {
regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
}
+ if (state.contains(HiveRegistrationPolicyBase.HIVE_TABLE_NAME)) {
+ regProperties.put(HiveRegistrationPolicyBase.HIVE_TABLE_NAME,
+ state.getProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME));
+ }
+ if (state.contains(KafkaSource.TOPIC_NAME)) {
+ regProperties.put(KafkaSource.TOPIC_NAME,
+ state.getProp(KafkaSource.TOPIC_NAME));
+ }
if
(state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 9c2c4fdf1..27a2629e5 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -48,12 +48,14 @@ import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
@@ -68,12 +70,12 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.HiveCatalogs;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
@@ -186,9 +188,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private final Map<TableIdentifier, String> tableTopicPartitionMap;
@Getter
private final KafkaSchemaRegistry schemaRegistry;
- private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
+ protected final Map<TableIdentifier, TableMetadata> tableMetadataMap;
@Setter
- protected HiveCatalog catalog;
+ protected Catalog catalog;
protected final Configuration conf;
protected final ReadWriteLock readWriteLock;
private final HiveLock locks;
@@ -330,7 +332,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
tableMetadata.lowestGMCEEmittedTime =
Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
switch (gmce.getOperationType()) {
case add_files: {
- updateTableProperty(tableSpec, tid);
+ updateTableProperty(tableSpec, tid, gmce);
addFiles(gmce, newSpecsMap, table, tableMetadata);
if (gmce.getAuditCountMap() != null &&
auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
tableSpec.getTable().getTableName())) {
@@ -342,7 +344,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
break;
}
case rewrite_files: {
- updateTableProperty(tableSpec, tid);
+ updateTableProperty(tableSpec, tid, gmce);
rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
break;
}
@@ -351,7 +353,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
break;
}
case change_property: {
- updateTableProperty(tableSpec, tid);
+ updateTableProperty(tableSpec, tid, gmce);
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
@@ -418,7 +420,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
}
- private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
+ protected void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid,
GobblinMetadataChangeEvent gmce) {
org.apache.hadoop.hive.metastore.api.Table table =
HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
tableMetadata.newProperties =
Optional.of(IcebergUtils.getTableProperties(table));
@@ -449,7 +451,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
.expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
.build()));
- Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
+ Cache<String, Pair<Schema, String>> candidate =
tableMetadata.candidateSchemas.get();
try {
switch (gmce.getSchemaSource()) {
case SCHEMAREGISTRY: {
@@ -457,15 +459,15 @@ public class IcebergMetadataWriter implements
MetadataWriter {
String createdOn = AvroUtils.getSchemaCreationTime(schema);
if (createdOn == null) {
candidate.put(DEFAULT_CREATION_TIME,
- IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema);
+ Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema, gmce.getTableSchema()));
} else if (!createdOn.equals(lastSchemaVersion)) {
- candidate.put(createdOn,
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
+ candidate.put(createdOn,
Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema, gmce.getTableSchema()));
}
break;
}
case EVENT: {
candidate.put(DEFAULT_CREATION_TIME,
- IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema);
+ Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(),
hiveTable).tableSchema, gmce.getTableSchema()));
break;
}
case NONE: {
@@ -780,6 +782,21 @@ public class IcebergMetadataWriter implements
MetadataWriter {
return partitionVal;
}
+ /**
+ * We will firstly try to use datasetOffsetRange to get the topic name, as
the pattern for datasetOffsetRange key should be ({topicName}-{partitionNumber})
+ * In case there is no datasetOffsetRange, we fall back to the table
property that we set previously for "topic.name"
+ * @return kafka topic name for this table
+ */
+ protected String getTopicName(TableIdentifier tid, TableMetadata
tableMetadata) {
+ if (tableMetadata.dataOffsetRange.isPresent()) {
+ String topicPartitionString =
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
+ //In case the topic name is not the table name or the topic name
contains '-'
+ return topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
+ }
+ return tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
+ }
+
/**
* For flush of each table, we do the following logic:
* 1. Commit the appendFiles if it exist
@@ -801,12 +818,14 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
- String topic = props.get(TOPIC_NAME_KEY);
+ //Set data offset range
+ setDatasetOffsetRange(tableMetadata, props);
+ String topicName = getTopicName(tid, tableMetadata);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
- sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
+ sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
if (tableMetadata.completenessEnabled) {
- checkAndUpdateCompletenessWatermark(tableMetadata, topic,
tableMetadata.datePartitions, props);
+ checkAndUpdateCompletenessWatermark(tableMetadata, topicName,
tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
@@ -817,15 +836,15 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
&& tableMetadata.completenessEnabled) {
if (tableMetadata.completionWatermark >
DEFAULT_COMPLETION_WATERMARK) {
- log.info(String.format("Checking kafka audit for %s on
change_property ", topic));
+ log.info(String.format("Checking kafka audit for %s on
change_property ", topicName));
SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
ZonedDateTime prevWatermarkDT =
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
timestamps.add(TimeIterator.inc(prevWatermarkDT,
TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
- checkAndUpdateCompletenessWatermark(tableMetadata, topic,
timestamps, props);
+ checkAndUpdateCompletenessWatermark(tableMetadata, topicName,
timestamps, props);
} else {
log.info(String.format("Need valid watermark, current watermark is
%s, Not checking kafka audit for %s",
- tableMetadata.completionWatermark, topic));
+ tableMetadata.completionWatermark, topicName));
}
}
@@ -842,14 +861,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
- //Set data offset range
- boolean containOffsetRange = setDatasetOffsetRange(tableMetadata,
props);
- String topicName = tableName;
- if (containOffsetRange) {
- String topicPartitionString =
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
- //In case the topic name is not the table name or the topic name
contains '-'
- topicName = topicPartitionString.substring(0,
topicPartitionString.lastIndexOf('-'));
- }
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
//Update properties
@@ -882,7 +893,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
@Override
public void reset(String dbName, String tableName) throws IOException {
- this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+ this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}
/**
@@ -952,7 +963,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
if (auditCountVerifier.get().isComplete(topicName,
- auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
+ auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
completionWatermark = timestampMillis;
// Also persist the watermark into State object to share this with
other MetadataWriters
// we enforce ourselves to always use lower-cased table name here
@@ -1026,7 +1037,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Cache candidates = tableMetadata.candidateSchemas.get();
//Only have default schema, so either we calculate schema from event
or the schema does not have creation time, directly update it
if (candidates.size() == 1 &&
candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) {
- updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema)
candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
+ updateSchemaHelper(DEFAULT_CREATION_TIME,
+ (Pair<Schema, String>)
candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
tableMetadata.table.get());
} else {
//update schema if candidates contains the schema that has the same
creation time with the latest schema
@@ -1037,7 +1049,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
log.warn(
"Schema from schema registry does not contain creation time,
check config for schema registry class");
} else if (candidates.getIfPresent(creationTime) != null) {
- updateSchemaHelper(creationTime, (Schema)
candidates.getIfPresent(creationTime), props,
+ updateSchemaHelper(creationTime, (Pair<Schema, String>)
candidates.getIfPresent(creationTime), props,
tableMetadata.table.get());
}
}
@@ -1047,10 +1059,11 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
}
- private void updateSchemaHelper(String schemaCreationTime, Schema schema,
Map<String, String> props, Table table) {
+ private void updateSchemaHelper(String schemaCreationTime, Pair<Schema,
String> schema, Map<String, String> props, Table table) {
try {
- table.updateSchema().unionByNameWith(schema).commit();
+ table.updateSchema().unionByNameWith(schema.getLeft()).commit();
props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime);
+
props.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
schema.getRight());
} catch (Exception e) {
log.error("Cannot update schema to " + schema.toString() + "for table "
+ table.location(), e);
}
@@ -1122,7 +1135,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
*
* Also note the difference with {@link org.apache.iceberg.TableMetadata}.
*/
- private class TableMetadata {
+ public class TableMetadata {
Optional<Table> table = Optional.absent();
/**
@@ -1133,10 +1146,10 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Optional<Transaction> transaction = Optional.absent();
private Optional<AppendFiles> appendFiles = Optional.absent();
private Optional<DeleteFiles> deleteFiles = Optional.absent();
+ public Optional<Map<String, String>> newProperties = Optional.absent();
Optional<Map<String, String>> lastProperties = Optional.absent();
- Optional<Map<String, String>> newProperties = Optional.absent();
- Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
+ Optional<Cache<String, Pair<Schema, String>>> candidateSchemas =
Optional.absent();
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index b6174ec7d..294ef08ab 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -124,11 +124,11 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
startMetastore();
tmpDir = Files.createTempDir();
- hourlyDataFile_1 = new File(tmpDir,
"testDB/testIcebergTable/hourly/2020/03/17/08/data.avro");
+ hourlyDataFile_1 = new File(tmpDir,
"testDB/testTopic/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
- hourlyDataFile_2 = new File(tmpDir,
"testDB/testIcebergTable/hourly/2020/03/17/09/data.avro");
+ hourlyDataFile_2 = new File(tmpDir,
"testDB/testTopic/hourly/2020/03/17/09/data.avro");
Files.createParentDirs(hourlyDataFile_2);
- dailyDataFile = new File(tmpDir,
"testDB/testIcebergTable/daily/2020/03/17/data.avro");
+ dailyDataFile = new File(tmpDir,
"testDB/testTopic/daily/2020/03/17/data.avro");
Files.createParentDirs(dailyDataFile);
dataDir = new File(hourlyDataFile_1.getParent());
Assert.assertTrue(dataDir.exists());
@@ -139,7 +139,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
.setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
- .setNativeName(new File(tmpDir,
"testDB/testIcebergTable").getAbsolutePath())
+ .setNativeName(new File(tmpDir,
"testDB/testTopic").getAbsolutePath())
.build())
.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "0-1000").build())
.setFlowId("testFlow")
@@ -221,7 +221,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
Assert.assertEquals(table.location(),
- new File(tmpDir,
"testDB/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+ new File(tmpDir,
"testDB/testTopic/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "1000-2000").build());
GenericRecord genericGmce_1000_2000 =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
@@ -363,11 +363,11 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
- .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
- .get("hivedb.testIcebergTable").get(0).lowWatermark, 50L);
+ .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
+ .get("hivedb.testTopic").get(0).lowWatermark, 50L);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
- .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
- .get("hivedb.testIcebergTable").get(0).highWatermark, 52L);
+ .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
+ .get("hivedb.testTopic").get(0).highWatermark, 52L);
// No events sent yet since the topic has not been flushed
Assert.assertEquals(eventsSent.size(), 0);
@@ -378,7 +378,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Since this topic has been flushed, there should be an event sent for
previous failure, and the table
// should be removed from the error map
Assert.assertEquals(eventsSent.size(), 1);
-
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
"testIcebergTable");
+
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
"testTopic");
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_LOW_WATERMARK),
"50");
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_HIGH_WATERMARK),
"52");
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
0);
@@ -398,7 +398,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelop method
// without risking running into type cast runtime error.
gmce.setOperationType(OperationType.add_files);
- File hourlyFile = new File(tmpDir,
"testDB/testIcebergTable/hourly/2021/09/16/10/data.avro");
+ File hourlyFile = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/10/data.avro");
long timestampMillis = 1631811600000L;
Files.createParentDirs(hourlyFile);
writeRecord(hourlyFile);
@@ -421,13 +421,13 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopic", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
IcebergMetadataWriter imw = (IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
imw.setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
//completeness watermark = "2020-09-16-10"
- Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY),
"testIcebergTable");
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
// 1631811600000L correspond to 2020-09-16-10 in PT
@@ -437,7 +437,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertTrue(dfl.hasNext());
// Test when completeness watermark is still "2021-09-16-10" but have a
late file for "2021-09-16-09"
- File hourlyFile1 = new File(tmpDir,
"testDB/testIcebergTable/hourly/2021/09/16/09/data1.avro");
+ File hourlyFile1 = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/09/data1.avro");
Files.createParentDirs(hourlyFile1);
writeRecord(hourlyFile1);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -460,7 +460,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
// Test when completeness watermark will advance to "2021-09-16-11"
- File hourlyFile2 = new File(tmpDir,
"testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+ File hourlyFile2 = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
Files.createParentDirs(hourlyFile2);
writeRecord(hourlyFile2);
@@ -476,7 +476,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));
- Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 -
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopic", timestampMillis1 -
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
@@ -495,7 +495,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
long watermark =
Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
- File hourlyFile2 = new File(tmpDir,
"testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+ File hourlyFile2 = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
gmce.setOldFilePrefixes(null);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
.setFilePath(hourlyFile2.toString())
@@ -511,14 +511,14 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
new LongWatermark(65L))));
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testIcebergTable", watermark,
expectedWatermark)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopic", watermark,
expectedWatermark)).thenReturn(true);
((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-7000");
Assert.assertEquals(table.spec().fields().get(1).name(), "late");
- Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY),
"testIcebergTable");
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(expectedWatermark));
@@ -558,7 +558,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
partitionValue = "2020-03-17-00";
}
return Optional.of(new
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
- .withDbName("hivedb").withTableName("testIcebergTable").build());
+ .withDbName("hivedb").withTableName("testTopic").build());
}
@Override
protected List<HiveTable> getTables(Path path) throws IOException {
@@ -577,7 +577,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
if (path.toString().contains("testFaultTolerant")) {
return Lists.newArrayList("testFaultTolerantIcebergTable");
}
- return Lists.newArrayList("testIcebergTable");
+ return Lists.newArrayList("testTopic");
}
}