This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new eda83a259 [GOBBLIN-1810] Support general iceberg catalog in 
icebergMetadataWriter (#3672)
eda83a259 is described below

commit eda83a25997d23bfe4266453ebdfe9bd5c641a40
Author: Zihan Li <[email protected]>
AuthorDate: Thu Apr 13 09:55:56 2023 -0700

    [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter 
(#3672)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * Revert "[GOBBLIN-1810] Support general iceberg catalog in 
icebergMetadataWriter"
    
    This reverts commit b0844e8d7740b9eaa21132604f532a964c3f9e52.
    
    * [GOBBLIN-1810] Support general iceberg catalog in icebergMetadataWriter
    
    * fix unit test
    
    * add java doc
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../apache/gobblin/iceberg/GobblinMCEProducer.java |  9 +++
 .../iceberg/writer/IcebergMetadataWriter.java      | 81 +++++++++++++---------
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 42 +++++------
 3 files changed, 77 insertions(+), 55 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index b037cb635..0f0061ed2 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -46,6 +46,7 @@ import org.apache.gobblin.metadata.IntegerLongPair;
 import org.apache.gobblin.metadata.OperationType;
 import org.apache.gobblin.metadata.SchemaSource;
 import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.apache.gobblin.writer.PartitionedDataWriter;
@@ -149,6 +150,14 @@ public abstract class GobblinMCEProducer implements 
Closeable {
       regProperties.put(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME,
           state.getProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME));
     }
+    if (state.contains(HiveRegistrationPolicyBase.HIVE_TABLE_NAME)) {
+      regProperties.put(HiveRegistrationPolicyBase.HIVE_TABLE_NAME,
+          state.getProp(HiveRegistrationPolicyBase.HIVE_TABLE_NAME));
+    }
+    if (state.contains(KafkaSource.TOPIC_NAME)) {
+      regProperties.put(KafkaSource.TOPIC_NAME,
+          state.getProp(KafkaSource.TOPIC_NAME));
+    }
     if 
(state.contains(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES)) {
       
regProperties.put(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
           
state.getProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES));
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 9c2c4fdf1..27a2629e5 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -48,12 +48,14 @@ import java.util.stream.Stream;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gobblin.hive.writer.MetadataWriterKeys;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFiles;
@@ -68,12 +70,12 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.HiveCatalogs;
 import org.apache.iceberg.types.Types;
 import org.joda.time.DateTime;
@@ -186,9 +188,9 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   private final Map<TableIdentifier, String> tableTopicPartitionMap;
   @Getter
   private final KafkaSchemaRegistry schemaRegistry;
-  private final Map<TableIdentifier, TableMetadata> tableMetadataMap;
+  protected final Map<TableIdentifier, TableMetadata> tableMetadataMap;
   @Setter
-  protected HiveCatalog catalog;
+  protected Catalog catalog;
   protected final Configuration conf;
   protected final ReadWriteLock readWriteLock;
   private final HiveLock locks;
@@ -330,7 +332,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     tableMetadata.lowestGMCEEmittedTime = 
Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
     switch (gmce.getOperationType()) {
       case add_files: {
-        updateTableProperty(tableSpec, tid);
+        updateTableProperty(tableSpec, tid, gmce);
         addFiles(gmce, newSpecsMap, table, tableMetadata);
         if (gmce.getAuditCountMap() != null && 
auditWhitelistBlacklist.acceptTable(tableSpec.getTable().getDbName(),
             tableSpec.getTable().getTableName())) {
@@ -342,7 +344,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         break;
       }
       case rewrite_files: {
-        updateTableProperty(tableSpec, tid);
+        updateTableProperty(tableSpec, tid, gmce);
         rewriteFiles(gmce, newSpecsMap, oldSpecsMap, table, tableMetadata);
         break;
       }
@@ -351,7 +353,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         break;
       }
       case change_property: {
-        updateTableProperty(tableSpec, tid);
+        updateTableProperty(tableSpec, tid, gmce);
         if (gmce.getTopicPartitionOffsetsRange() != null) {
           mergeOffsets(gmce, tid);
         }
@@ -418,7 +420,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     }
   }
 
-  private void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid) {
+  protected void updateTableProperty(HiveSpec tableSpec, TableIdentifier tid, 
GobblinMetadataChangeEvent gmce) {
     org.apache.hadoop.hive.metastore.api.Table table = 
HiveMetaStoreUtils.getTable(tableSpec.getTable());
     TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t -> 
new TableMetadata());
     tableMetadata.newProperties = 
Optional.of(IcebergUtils.getTableProperties(table));
@@ -449,7 +451,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         .expireAfterAccess(conf.getInt(MetadataWriter.CACHE_EXPIRING_TIME,
             MetadataWriter.DEFAULT_CACHE_EXPIRING_TIME), TimeUnit.HOURS)
         .build()));
-    Cache<String, Schema> candidate = tableMetadata.candidateSchemas.get();
+    Cache<String, Pair<Schema, String>> candidate = 
tableMetadata.candidateSchemas.get();
     try {
       switch (gmce.getSchemaSource()) {
         case SCHEMAREGISTRY: {
@@ -457,15 +459,15 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
           String createdOn = AvroUtils.getSchemaCreationTime(schema);
           if (createdOn == null) {
             candidate.put(DEFAULT_CREATION_TIME,
-                IcebergUtils.getIcebergSchema(gmce.getTableSchema(), 
hiveTable).tableSchema);
+                Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), 
hiveTable).tableSchema, gmce.getTableSchema()));
           } else if (!createdOn.equals(lastSchemaVersion)) {
-            candidate.put(createdOn, 
IcebergUtils.getIcebergSchema(gmce.getTableSchema(), hiveTable).tableSchema);
+            candidate.put(createdOn, 
Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), 
hiveTable).tableSchema, gmce.getTableSchema()));
           }
           break;
         }
         case EVENT: {
           candidate.put(DEFAULT_CREATION_TIME,
-              IcebergUtils.getIcebergSchema(gmce.getTableSchema(), 
hiveTable).tableSchema);
+              Pair.of(IcebergUtils.getIcebergSchema(gmce.getTableSchema(), 
hiveTable).tableSchema, gmce.getTableSchema()));
           break;
         }
         case NONE: {
@@ -780,6 +782,21 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     return partitionVal;
   }
 
+  /**
+   * We will firstly try to use datasetOffsetRange to get the topic name, as 
the pattern for datasetOffsetRange key should be ({topicName}-{partitionNumber})
+   * In case there is no datasetOffsetRange, we fall back to the table 
property that we set previously for "topic.name"
+   * @return kafka topic name for this table
+   */
+  protected String getTopicName(TableIdentifier tid, TableMetadata 
tableMetadata) {
+    if (tableMetadata.dataOffsetRange.isPresent()) {
+      String topicPartitionString = 
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
+      //In case the topic name is not the table name or the topic name 
contains '-'
+      return topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
+    }
+    return tableMetadata.newProperties.or(
+        
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties()))).get(TOPIC_NAME_KEY);
+  }
+
   /**
    * For flush of each table, we do the following logic:
    * 1. Commit the appendFiles if it exist
@@ -801,12 +818,14 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         Transaction transaction = tableMetadata.transaction.get();
         Map<String, String> props = tableMetadata.newProperties.or(
             
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
-        String topic = props.get(TOPIC_NAME_KEY);
+        //Set data offset range
+        setDatasetOffsetRange(tableMetadata, props);
+        String topicName = getTopicName(tid, tableMetadata);
         if (tableMetadata.appendFiles.isPresent()) {
           tableMetadata.appendFiles.get().commit();
-          sendAuditCounts(topic, tableMetadata.serializedAuditCountMaps);
+          sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
           if (tableMetadata.completenessEnabled) {
-            checkAndUpdateCompletenessWatermark(tableMetadata, topic, 
tableMetadata.datePartitions, props);
+            checkAndUpdateCompletenessWatermark(tableMetadata, topicName, 
tableMetadata.datePartitions, props);
           }
         }
         if (tableMetadata.deleteFiles.isPresent()) {
@@ -817,15 +836,15 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
             && tableMetadata.completenessEnabled) {
           if (tableMetadata.completionWatermark > 
DEFAULT_COMPLETION_WATERMARK) {
-            log.info(String.format("Checking kafka audit for %s on 
change_property ", topic));
+            log.info(String.format("Checking kafka audit for %s on 
change_property ", topicName));
             SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
             ZonedDateTime prevWatermarkDT =
                 
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
             timestamps.add(TimeIterator.inc(prevWatermarkDT, 
TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
-            checkAndUpdateCompletenessWatermark(tableMetadata, topic, 
timestamps, props);
+            checkAndUpdateCompletenessWatermark(tableMetadata, topicName, 
timestamps, props);
           } else {
             log.info(String.format("Need valid watermark, current watermark is 
%s, Not checking kafka audit for %s",
-                tableMetadata.completionWatermark, topic));
+                tableMetadata.completionWatermark, topicName));
           }
         }
 
@@ -842,14 +861,6 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
             conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
                 TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
-        //Set data offset range
-        boolean containOffsetRange = setDatasetOffsetRange(tableMetadata, 
props);
-        String topicName = tableName;
-        if (containOffsetRange) {
-          String topicPartitionString = 
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
-          //In case the topic name is not the table name or the topic name 
contains '-'
-          topicName = topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
-        }
         //Update schema(commit)
         updateSchema(tableMetadata, props, topicName);
         //Update properties
@@ -882,7 +893,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
 
   @Override
   public void reset(String dbName, String tableName) throws IOException {
-      this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
+    this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
   }
 
   /**
@@ -952,7 +963,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
           ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
           if (auditCountVerifier.get().isComplete(topicName,
-                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
+              auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
             completionWatermark = timestampMillis;
             // Also persist the watermark into State object to share this with 
other MetadataWriters
             // we enforce ourselves to always use lower-cased table name here
@@ -1026,7 +1037,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         Cache candidates = tableMetadata.candidateSchemas.get();
         //Only have default schema, so either we calculate schema from event 
or the schema does not have creation time, directly update it
         if (candidates.size() == 1 && 
candidates.getIfPresent(DEFAULT_CREATION_TIME) != null) {
-          updateSchemaHelper(DEFAULT_CREATION_TIME, (Schema) 
candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
+          updateSchemaHelper(DEFAULT_CREATION_TIME,
+              (Pair<Schema, String>) 
candidates.getIfPresent(DEFAULT_CREATION_TIME), props,
               tableMetadata.table.get());
         } else {
           //update schema if candidates contains the schema that has the same 
creation time with the latest schema
@@ -1037,7 +1049,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
             log.warn(
                 "Schema from schema registry does not contain creation time, 
check config for schema registry class");
           } else if (candidates.getIfPresent(creationTime) != null) {
-            updateSchemaHelper(creationTime, (Schema) 
candidates.getIfPresent(creationTime), props,
+            updateSchemaHelper(creationTime, (Pair<Schema, String>) 
candidates.getIfPresent(creationTime), props,
                 tableMetadata.table.get());
           }
         }
@@ -1047,10 +1059,11 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     }
   }
 
-  private void updateSchemaHelper(String schemaCreationTime, Schema schema, 
Map<String, String> props, Table table) {
+  private void updateSchemaHelper(String schemaCreationTime, Pair<Schema, 
String> schema, Map<String, String> props, Table table) {
     try {
-      table.updateSchema().unionByNameWith(schema).commit();
+      table.updateSchema().unionByNameWith(schema.getLeft()).commit();
       props.put(SCHEMA_CREATION_TIME_KEY, schemaCreationTime);
+      
props.put(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
schema.getRight());
     } catch (Exception e) {
       log.error("Cannot update schema to " + schema.toString() + "for table " 
+ table.location(), e);
     }
@@ -1122,7 +1135,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    *
    * Also note the difference with {@link org.apache.iceberg.TableMetadata}.
    */
-  private class TableMetadata {
+  public class TableMetadata {
     Optional<Table> table = Optional.absent();
 
     /**
@@ -1133,10 +1146,10 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     Optional<Transaction> transaction = Optional.absent();
     private Optional<AppendFiles> appendFiles = Optional.absent();
     private Optional<DeleteFiles> deleteFiles = Optional.absent();
+    public Optional<Map<String, String>> newProperties = Optional.absent();
 
     Optional<Map<String, String>> lastProperties = Optional.absent();
-    Optional<Map<String, String>> newProperties = Optional.absent();
-    Optional<Cache<String, Schema>> candidateSchemas = Optional.absent();
+    Optional<Cache<String, Pair<Schema, String>>> candidateSchemas = 
Optional.absent();
     Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
     Optional<String> lastSchemaVersion = Optional.absent();
     Optional<Long> lowWatermark = Optional.absent();
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index b6174ec7d..294ef08ab 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -124,11 +124,11 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     startMetastore();
 
     tmpDir = Files.createTempDir();
-    hourlyDataFile_1 = new File(tmpDir, 
"testDB/testIcebergTable/hourly/2020/03/17/08/data.avro");
+    hourlyDataFile_1 = new File(tmpDir, 
"testDB/testTopic/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
-    hourlyDataFile_2 = new File(tmpDir, 
"testDB/testIcebergTable/hourly/2020/03/17/09/data.avro");
+    hourlyDataFile_2 = new File(tmpDir, 
"testDB/testTopic/hourly/2020/03/17/09/data.avro");
     Files.createParentDirs(hourlyDataFile_2);
-    dailyDataFile = new File(tmpDir, 
"testDB/testIcebergTable/daily/2020/03/17/data.avro");
+    dailyDataFile = new File(tmpDir, 
"testDB/testTopic/daily/2020/03/17/data.avro");
     Files.createParentDirs(dailyDataFile);
     dataDir = new File(hourlyDataFile_1.getParent());
     Assert.assertTrue(dataDir.exists());
@@ -139,7 +139,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         .setDatasetIdentifier(DatasetIdentifier.newBuilder()
             .setDataOrigin(DataOrigin.EI)
             .setDataPlatformUrn("urn:namespace:dataPlatform:hdfs")
-            .setNativeName(new File(tmpDir, 
"testDB/testIcebergTable").getAbsolutePath())
+            .setNativeName(new File(tmpDir, 
"testDB/testTopic").getAbsolutePath())
             .build())
         .setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "0-1000").build())
         .setFlowId("testFlow")
@@ -221,7 +221,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
     Assert.assertEquals(table.location(),
-        new File(tmpDir, 
"testDB/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
+        new File(tmpDir, 
"testDB/testTopic/_iceberg_metadata/").getAbsolutePath() + "/" + dbName);
 
     gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "1000-2000").build());
     GenericRecord genericGmce_1000_2000 = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
@@ -363,11 +363,11 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().size(), 1);
     
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
 1);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
-        .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
-        .get("hivedb.testIcebergTable").get(0).lowWatermark, 50L);
+        .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
+        .get("hivedb.testTopic").get(0).lowWatermark, 50L);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
-        .get(new File(tmpDir, "testDB/testIcebergTable").getAbsolutePath())
-        .get("hivedb.testIcebergTable").get(0).highWatermark, 52L);
+        .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
+        .get("hivedb.testTopic").get(0).highWatermark, 52L);
 
     // No events sent yet since the topic has not been flushed
     Assert.assertEquals(eventsSent.size(), 0);
@@ -378,7 +378,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Since this topic has been flushed, there should be an event sent for 
previous failure, and the table
     // should be removed from the error map
     Assert.assertEquals(eventsSent.size(), 1);
-    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
 "testIcebergTable");
+    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
 "testTopic");
     
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_LOW_WATERMARK),
 "50");
     
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_HIGH_WATERMARK),
 "52");
     
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
 0);
@@ -398,7 +398,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelop method
     // without risking running into type cast runtime error.
     gmce.setOperationType(OperationType.add_files);
-    File hourlyFile = new File(tmpDir, 
"testDB/testIcebergTable/hourly/2021/09/16/10/data.avro");
+    File hourlyFile = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/10/data.avro");
     long timestampMillis = 1631811600000L;
     Files.createParentDirs(hourlyFile);
     writeRecord(hourlyFile);
@@ -421,13 +421,13 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
     // Test when completeness watermark = -1 bootstrap case
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis - 
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopic", timestampMillis - 
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
     IcebergMetadataWriter imw = (IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
     imw.setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     //completeness watermark = "2020-09-16-10"
-    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), 
"testIcebergTable");
+    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis));
     // 1631811600000L correspond to 2020-09-16-10 in PT
@@ -437,7 +437,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertTrue(dfl.hasNext());
 
     // Test when completeness watermark is still "2021-09-16-10" but have a 
late file for "2021-09-16-09"
-    File hourlyFile1 = new File(tmpDir, 
"testDB/testIcebergTable/hourly/2021/09/16/09/data1.avro");
+    File hourlyFile1 = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/09/data1.avro");
     Files.createParentDirs(hourlyFile1);
     writeRecord(hourlyFile1);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -460,7 +460,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
 
     // Test when completeness watermark will advance to "2021-09-16-11"
-    File hourlyFile2 = new File(tmpDir, 
"testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+    File hourlyFile2 = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
     long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
     Files.createParentDirs(hourlyFile2);
     writeRecord(hourlyFile2);
@@ -476,7 +476,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(60L))));
 
-    Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 - 
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - 
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis1));
@@ -495,7 +495,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     long watermark = 
Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
     long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
-    File hourlyFile2 = new File(tmpDir, 
"testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+    File hourlyFile2 = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
     gmce.setOldFilePrefixes(null);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
         .setFilePath(hourlyFile2.toString())
@@ -511,14 +511,14 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
             new LongWatermark(65L))));
 
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testIcebergTable", watermark, 
expectedWatermark)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopic", watermark, 
expectedWatermark)).thenReturn(true);
     ((IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
 
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-7000");
     Assert.assertEquals(table.spec().fields().get(1).name(), "late");
-    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), 
"testIcebergTable");
+    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(expectedWatermark));
 
@@ -558,7 +558,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         partitionValue = "2020-03-17-00";
       }
       return Optional.of(new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
-          .withDbName("hivedb").withTableName("testIcebergTable").build());
+          .withDbName("hivedb").withTableName("testTopic").build());
     }
     @Override
     protected List<HiveTable> getTables(Path path) throws IOException {
@@ -577,7 +577,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
       if (path.toString().contains("testFaultTolerant")) {
         return Lists.newArrayList("testFaultTolerantIcebergTable");
       }
-      return Lists.newArrayList("testIcebergTable");
+      return Lists.newArrayList("testTopic");
     }
   }
 

Reply via email to