This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 949b01927 [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 
(#3697)
949b01927 is described below

commit 949b01927e642a40a1f5c208926925988910c064
Author: meethngala <[email protected]>
AuthorDate: Wed Jun 14 16:39:12 2023 -0700

    [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (#3697)
    
    * upgrade iceberg version to 1.2.0
    
    * address PR comments
    
    * resolve merge conflicts
    
    * fix checkstyle
    
    * update assert to verfiy error map elements in testFaultTolerant unit test
    
    ---------
    
    Co-authored-by: Meeth Gala <[email protected]>
---
 gobblin-data-management/build.gradle               |  2 +-
 .../data/management/copy/iceberg/IcebergTable.java |  2 +-
 gobblin-iceberg/build.gradle                       |  2 +-
 .../apache/gobblin/iceberg/Utils/IcebergUtils.java |  2 +
 .../iceberg/publisher/GobblinMCEPublisher.java     |  2 +-
 .../iceberg/writer/IcebergMetadataWriter.java      |  5 +-
 .../iceberg/writer/HiveMetadataWriterTest.java     |  1 -
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 74 +++++++++++-----------
 gradle/scripts/defaultBuildProperties.gradle       |  2 +-
 gradle/scripts/dependencyDefinitions.gradle        |  1 +
 10 files changed, 48 insertions(+), 45 deletions(-)

diff --git a/gobblin-data-management/build.gradle 
b/gobblin-data-management/build.gradle
index c3b3129ea..8b25f8562 100644
--- a/gobblin-data-management/build.gradle
+++ b/gobblin-data-management/build.gradle
@@ -45,7 +45,7 @@ dependencies {
   compile externalDependency.junit
   compile externalDependency.jacksonMapperAsl
 
-  testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', 
version: '0.11.1', classifier: 'tests') {
+  testCompile(externalDependency.icebergHiveMetastoreTest) {
     transitive = false
   }
   testCompile('org.apache.hadoop:hadoop-common:2.6.0')
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 6671ebdeb..529f53a45 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -161,7 +161,7 @@ public class IcebergTable {
         metadataFileLocation,
         snapshot.manifestListLocation(),
         // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, 
tableOps.io()))` due to checked exception
-        skipManifestFileInfo ? Lists.newArrayList() : 
calcAllManifestFileInfos(snapshot.allManifests(), tableOps.io())
+        skipManifestFileInfo ? Lists.newArrayList() : 
calcAllManifestFileInfos(snapshot.allManifests(tableOps.io()), tableOps.io())
       );
   }
 
diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle
index fe030f80c..c6626b299 100644
--- a/gobblin-iceberg/build.gradle
+++ b/gobblin-iceberg/build.gradle
@@ -48,7 +48,7 @@ dependencies {
     compile externalDependency.findBugsAnnotations
     compile externalDependency.avroMapredH2
 
-    testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore', 
version: '0.11.1', classifier: 'tests') {
+    testCompile(externalDependency.icebergHiveMetastoreTest) {
         transitive = false
     }
     testCompile('org.apache.hadoop:hadoop-common:2.6.0')
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
index 65fb551ec..c7b528cb5 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
@@ -248,6 +248,8 @@ public class IcebergUtils {
         
IcebergUtils.getMapFromIntegerLongPairs(file.getFileMetrics().getColumnSizes(), 
schemaIdMap),
         
IcebergUtils.getMapFromIntegerLongPairs(file.getFileMetrics().getValueCounts(), 
schemaIdMap),
         
IcebergUtils.getMapFromIntegerLongPairs(file.getFileMetrics().getNullValueCounts(),
 schemaIdMap),
+        // TODO: If required, handle NaN value count File metric conversion in 
ORC metrics with iceberg upgrade
+        IcebergUtils.getMapFromIntegerLongPairs(Lists.newArrayList(), 
schemaIdMap), // metric value will be null since Nan values are supported from 
avro version 1.10.*
         
IcebergUtils.getMapFromIntegerBytesPairs(file.getFileMetrics().getLowerBounds(),
 schemaIdMap),
         
IcebergUtils.getMapFromIntegerBytesPairs(file.getFileMetrics().getUpperBounds(),
 schemaIdMap));
     return dataFileBuilder.withMetrics(metrics).build();
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 47616529f..ffeefaa08 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -219,7 +219,7 @@ public class GobblinMCEPublisher extends DataPublisher {
       }
       case AVRO: {
         try {
-          return new Metrics(100000000L, null, null, null);
+          return new Metrics(100000000L, null, null, null, null);
         } catch (Exception e) {
           throw new RuntimeException("Cannot get file information for file " + 
path.toString(), e);
         }
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 9c3d30e9e..ecd528323 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.ExpireSnapshots;
@@ -77,7 +78,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.types.Types;
 import org.joda.time.DateTime;
 import org.joda.time.format.PeriodFormatter;
@@ -253,7 +254,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   }
 
   protected void initializeCatalog() {
-    catalog = HiveCatalogs.loadCatalog(conf);
+    catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(), 
"HiveCatalog", new HashMap<>(), conf);
   }
 
   private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws 
NoSuchTableException {
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 4225ce5ce..f50bc81aa 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -128,7 +128,6 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
   @BeforeSuite
   public void setUp() throws Exception {
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
-    startMetastore();
     State state = 
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
     Optional<String> metastoreUri = 
Optional.fromNullable(state.getProperties().getProperty(HiveRegister.HIVE_METASTORE_URI_KEY));
     hc = HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index bc1c8ca57..bf7dcb383 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -124,8 +124,6 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
   @BeforeClass
   public void setUp() throws Exception {
     Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
-    startMetastore();
-
     tmpDir = Files.createTempDir();
     hourlyDataFile_1 = new File(tmpDir, 
"testDB/testTopic/hourly/2020/03/17/08/data.avro");
     Files.createParentDirs(hourlyDataFile_1);
@@ -236,7 +234,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     gobblinMCEWriter.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-2000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 1);
+    
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 1);
     // Assert low watermark and high watermark set properly
     
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "9");
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "20");
@@ -256,7 +254,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     gobblinMCEWriter.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+    
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
     
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "20");
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "30");
 
@@ -269,7 +267,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     gobblinMCEWriter.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+    
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
   }
 
   //Make sure hive test execute later and close the metastore
@@ -290,7 +288,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Iterator<org.apache.iceberg.DataFile>
         result = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
filePath_1)).collect().iterator();
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+    
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
     Assert.assertTrue(result.hasNext());
     GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), 
gmce);
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
@@ -313,7 +311,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
   public void testChangeProperty() throws IOException {
     Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+    
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 3);
     
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "30");
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "40");
 
@@ -335,7 +333,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     // Assert the offset has been updated
     Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-4000");
-    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+    
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 3);
     // Assert low watermark and high watermark set properly
     
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "40");
     
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "45");
@@ -367,11 +365,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
 1);
     Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
         .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
-        .get("hivedb.testTopic").get(0).lowWatermark, 50L);
-    Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
-        .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
-        .get("hivedb.testTopic").get(0).highWatermark, 52L);
-
+        .get("hivedb.testTopicCompleteness").get(0).getMessage(), "failed to 
flush table hivedb, testTopicCompleteness");
     // No events sent yet since the topic has not been flushed
     Assert.assertEquals(eventsSent.size(), 0);
 
@@ -381,7 +375,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Since this topic has been flushed, there should be an event sent for 
previous failure, and the table
     // should be removed from the error map
     Assert.assertEquals(eventsSent.size(), 1);
-    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
 "testTopic");
+    
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
 "testTopicCompleteness");
     
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_LOW_WATERMARK),
 "50");
     
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_HIGH_WATERMARK),
 "52");
     
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
 0);
@@ -401,7 +395,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelop method
     // without risking running into type cast runtime error.
     gmce.setOperationType(OperationType.add_files);
-    File hourlyFile = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/10/data.avro");
+    File hourlyFile = new File(tmpDir, 
"testDB/testTopicCompleteness/hourly/2021/09/16/10/data.avro");
     long timestampMillis = 1631811600000L;
     Files.createParentDirs(hourlyFile);
     writeRecord(hourlyFile);
@@ -410,25 +404,23 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         .setFileFormat("avro")
         .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
         .build()));
-    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "3000-4000").build());
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopicCompleteness-1", "3000-4000").build());
     GenericRecord genericGmce_3000_4000 = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
     gobblinMCEWriterWithCompletness.writeEnvelope(new 
RecordEnvelope<>(genericGmce_3000_4000,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(50L))));
-
-    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-4000");
+    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
     Assert.assertTrue(table.spec().fields().size() == 2);
     Assert.assertEquals(table.spec().fields().get(1).name(), "late");
 
     // Test when completeness watermark = -1 bootstrap case
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testTopic", timestampMillis - 
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis 
- TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
     IcebergMetadataWriter imw = (IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
     imw.setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
     //completeness watermark = "2020-09-16-10"
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
@@ -440,7 +432,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertTrue(dfl.hasNext());
 
     // Test when completeness watermark is still "2021-09-16-10" but have a 
late file for "2021-09-16-09"
-    File hourlyFile1 = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/09/data1.avro");
+    File hourlyFile1 = new File(tmpDir, 
"testDB/testTopicCompleteness/hourly/2021/09/16/09/data1.avro");
     Files.createParentDirs(hourlyFile1);
     writeRecord(hourlyFile1);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -448,14 +440,14 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         .setFileFormat("avro")
         .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
         .build()));
-    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "4000-5000").build());
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopicCompleteness-1", "4000-5000").build());
     GenericRecord genericGmce_4000_5000 = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
     gobblinMCEWriterWithCompletness.writeEnvelope(new 
RecordEnvelope<>(genericGmce_4000_5000,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(55L))));
     gobblinMCEWriterWithCompletness.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis));
 
     dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile1.getAbsolutePath())).collect().iterator();
@@ -463,7 +455,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
 
     // Test when completeness watermark will advance to "2021-09-16-11"
-    File hourlyFile2 = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
+    File hourlyFile2 = new File(tmpDir, 
"testDB/testTopicCompleteness/hourly/2021/09/16/11/data.avro");
     long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
     Files.createParentDirs(hourlyFile2);
     writeRecord(hourlyFile2);
@@ -472,16 +464,16 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         .setFileFormat("avro")
         .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
         .build()));
-    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "5000-6000").build());
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopicCompleteness-1", "5000-6000").build());
     GenericRecord genericGmce_5000_6000 = 
GenericData.get().deepCopy(gmce.getSchema(), gmce);
     gobblinMCEWriterWithCompletness.writeEnvelope(new 
RecordEnvelope<>(genericGmce_5000_6000,
         new KafkaStreamingExtractor.KafkaWatermark(
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(60L))));
 
-    Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - 
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis1 
- TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
     gobblinMCEWriterWithCompletness.flush();
-    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis1));
     // watermark 1631815200000L correspond to 2021-09-16-11 in PT
     
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
@@ -498,7 +490,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     ZonedDateTime expectedCWDt = 
ZonedDateTime.now(ZoneId.of(DEFAULT_TIME_ZONE)).truncatedTo(ChronoUnit.HOURS);
     // For quiet topics, watermark should always be beginning of current hour
     long expectedWatermark = expectedCWDt.toInstant().toEpochMilli();
-    File hourlyFile2 = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
+    File hourlyFile2 = new File(tmpDir, 
"testDB/testTopicCompleteness/hourly/2021/09/16/11/data.avro");
     gmce.setOldFilePrefixes(null);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
         .setFilePath(hourlyFile2.toString())
@@ -506,7 +498,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
         .build()));
     gmce.setOperationType(OperationType.change_property);
-    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "6000-7000").build());
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopicCompleteness-1", "6000-7000").build());
     GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), 
gmce);
     gobblinMCEWriterWithCompletness.writeEnvelope(new 
RecordEnvelope<>(genericGmce,
         new KafkaStreamingExtractor.KafkaWatermark(
@@ -515,12 +507,12 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
     // For quiet topics always check for previous hour window
-    Mockito.when(verifier.isComplete("testTopic", 
expectedCWDt.minusHours(1).toInstant().toEpochMilli(), 
expectedWatermark)).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopicCompleteness", 
expectedCWDt.minusHours(1).toInstant().toEpochMilli(), 
expectedWatermark)).thenReturn(true);
     ((IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
 
-    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-7000");
+    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
+    
Assert.assertEquals(table.properties().get("offset.range.testTopicCompleteness-1"),
 "3000-7000");
     Assert.assertEquals(table.spec().fields().get(1).name(), "late");
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
@@ -562,15 +554,20 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         partitionValue = "2020-03-17-00";
       }
       return Optional.of(new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
-          .withDbName("hivedb").withTableName("testTopic").build());
+          .withDbName("hivedb").withTableName(table.getTableName()).build());
     }
     @Override
     protected List<HiveTable> getTables(Path path) throws IOException {
       List<HiveTable> tables = super.getTables(path);
       for (HiveTable table : tables) {
-        table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
-            new HiveRegistrationUnit.Column("datepartition", 
serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)));
-        //table.setLocation(tmpDir.getAbsolutePath());
+        if (table.getTableName().equals("testTopicCompleteness")) {
+          table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
+              new HiveRegistrationUnit.Column("datepartition", 
serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)
+              , new HiveRegistrationUnit.Column("late", 
serdeConstants.INT_TYPE_NAME, StringUtils.EMPTY)));
+        } else {
+          
table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(new 
HiveRegistrationUnit.Column("datepartition", serdeConstants.STRING_TYPE_NAME, 
StringUtils.EMPTY)));
+          //table.setLocation(tmpDir.getAbsolutePath());
+        }
       }
       return tables;
     }
@@ -581,6 +578,9 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
       if (path.toString().contains("testFaultTolerant")) {
         return Lists.newArrayList("testFaultTolerantIcebergTable");
       }
+      else if (path.toString().contains("testTopicCompleteness")) {
+        return Lists.newArrayList("testTopicCompleteness");
+      }
       return Lists.newArrayList("testTopic");
     }
   }
diff --git a/gradle/scripts/defaultBuildProperties.gradle 
b/gradle/scripts/defaultBuildProperties.gradle
index 950c8780f..f7b5f9cb1 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -31,7 +31,7 @@ def BuildProperties BUILD_PROPERTIES = new 
BuildProperties(project)
     .register(new BuildProperty("gobblinFlavor", "standard", "Build flavor 
(see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
     .register(new BuildProperty("hadoopVersion", "2.10.0", "Hadoop 
dependencies version"))
     .register(new BuildProperty("hiveVersion", "1.0.1-avro", "Hive 
dependencies version"))
-    .register(new BuildProperty("icebergVersion", "0.11.1", "Iceberg 
dependencies version"))
+    .register(new BuildProperty("icebergVersion", "1.2.0", "Iceberg 
dependencies version"))
     .register(new BuildProperty("jdkVersion", 
JavaVersion.VERSION_1_8.toString(),
     "Java languange compatibility; supported versions: " + 
JavaVersion.VERSION_1_8))
     .register(new BuildProperty("kafka08Version", "0.8.2.2", "Kafka 0.8 
dependencies version"))
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 95f8f6c32..9c34ef028 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -82,6 +82,7 @@ ext.externalDependency = [
     "httpcore": "org.apache.httpcomponents:httpcore:4.4.11",
     "httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
     "icebergHive": "org.apache.iceberg:iceberg-hive-runtime:" + icebergVersion,
+    "icebergHiveMetastoreTest": "org.apache.iceberg:iceberg-hive-metastore:" + 
icebergVersion + ":tests",
     "jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
     "jmh": "org.openjdk.jmh:jmh-core:1.17.3",
     "jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3",

Reply via email to