This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 949b01927 [GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0
(#3697)
949b01927 is described below
commit 949b01927e642a40a1f5c208926925988910c064
Author: meethngala <[email protected]>
AuthorDate: Wed Jun 14 16:39:12 2023 -0700
[GOBBLIN-1835]Upgrade Iceberg Version from 0.11.1 to 1.2.0 (#3697)
* upgrade iceberg version to 1.2.0
* address PR comments
* resolve merge conflicts
* fix checkstyle
* update assert to verfiy error map elements in testFaultTolerant unit test
---------
Co-authored-by: Meeth Gala <[email protected]>
---
gobblin-data-management/build.gradle | 2 +-
.../data/management/copy/iceberg/IcebergTable.java | 2 +-
gobblin-iceberg/build.gradle | 2 +-
.../apache/gobblin/iceberg/Utils/IcebergUtils.java | 2 +
.../iceberg/publisher/GobblinMCEPublisher.java | 2 +-
.../iceberg/writer/IcebergMetadataWriter.java | 5 +-
.../iceberg/writer/HiveMetadataWriterTest.java | 1 -
.../iceberg/writer/IcebergMetadataWriterTest.java | 74 +++++++++++-----------
gradle/scripts/defaultBuildProperties.gradle | 2 +-
gradle/scripts/dependencyDefinitions.gradle | 1 +
10 files changed, 48 insertions(+), 45 deletions(-)
diff --git a/gobblin-data-management/build.gradle
b/gobblin-data-management/build.gradle
index c3b3129ea..8b25f8562 100644
--- a/gobblin-data-management/build.gradle
+++ b/gobblin-data-management/build.gradle
@@ -45,7 +45,7 @@ dependencies {
compile externalDependency.junit
compile externalDependency.jacksonMapperAsl
- testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore',
version: '0.11.1', classifier: 'tests') {
+ testCompile(externalDependency.icebergHiveMetastoreTest) {
transitive = false
}
testCompile('org.apache.hadoop:hadoop-common:2.6.0')
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 6671ebdeb..529f53a45 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -161,7 +161,7 @@ public class IcebergTable {
metadataFileLocation,
snapshot.manifestListLocation(),
// NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m,
tableOps.io()))` due to checked exception
- skipManifestFileInfo ? Lists.newArrayList() :
calcAllManifestFileInfos(snapshot.allManifests(), tableOps.io())
+ skipManifestFileInfo ? Lists.newArrayList() :
calcAllManifestFileInfos(snapshot.allManifests(tableOps.io()), tableOps.io())
);
}
diff --git a/gobblin-iceberg/build.gradle b/gobblin-iceberg/build.gradle
index fe030f80c..c6626b299 100644
--- a/gobblin-iceberg/build.gradle
+++ b/gobblin-iceberg/build.gradle
@@ -48,7 +48,7 @@ dependencies {
compile externalDependency.findBugsAnnotations
compile externalDependency.avroMapredH2
- testCompile(group: 'org.apache.iceberg', name: 'iceberg-hive-metastore',
version: '0.11.1', classifier: 'tests') {
+ testCompile(externalDependency.icebergHiveMetastoreTest) {
transitive = false
}
testCompile('org.apache.hadoop:hadoop-common:2.6.0')
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
index 65fb551ec..c7b528cb5 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/Utils/IcebergUtils.java
@@ -248,6 +248,8 @@ public class IcebergUtils {
IcebergUtils.getMapFromIntegerLongPairs(file.getFileMetrics().getColumnSizes(),
schemaIdMap),
IcebergUtils.getMapFromIntegerLongPairs(file.getFileMetrics().getValueCounts(),
schemaIdMap),
IcebergUtils.getMapFromIntegerLongPairs(file.getFileMetrics().getNullValueCounts(),
schemaIdMap),
+ // TODO: If required, handle NaN value count File metric conversion in
ORC metrics with iceberg upgrade
+ IcebergUtils.getMapFromIntegerLongPairs(Lists.newArrayList(),
schemaIdMap), // metric value will be null since Nan values are supported from
avro version 1.10.*
IcebergUtils.getMapFromIntegerBytesPairs(file.getFileMetrics().getLowerBounds(),
schemaIdMap),
IcebergUtils.getMapFromIntegerBytesPairs(file.getFileMetrics().getUpperBounds(),
schemaIdMap));
return dataFileBuilder.withMetrics(metrics).build();
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 47616529f..ffeefaa08 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -219,7 +219,7 @@ public class GobblinMCEPublisher extends DataPublisher {
}
case AVRO: {
try {
- return new Metrics(100000000L, null, null, null);
+ return new Metrics(100000000L, null, null, null, null);
} catch (Exception e) {
throw new RuntimeException("Cannot get file information for file " +
path.toString(), e);
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 9c3d30e9e..ecd528323 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
@@ -77,7 +78,7 @@ import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.HiveCatalogs;
+import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.types.Types;
import org.joda.time.DateTime;
import org.joda.time.format.PeriodFormatter;
@@ -253,7 +254,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
protected void initializeCatalog() {
- catalog = HiveCatalogs.loadCatalog(conf);
+ catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(),
"HiveCatalog", new HashMap<>(), conf);
}
private org.apache.iceberg.Table getIcebergTable(TableIdentifier tid) throws
NoSuchTableException {
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index 4225ce5ce..f50bc81aa 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -128,7 +128,6 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
@BeforeSuite
public void setUp() throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
- startMetastore();
State state =
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
Optional<String> metastoreUri =
Optional.fromNullable(state.getProperties().getProperty(HiveRegister.HIVE_METASTORE_URI_KEY));
hc = HiveMetastoreClientPool.get(state.getProperties(), metastoreUri);
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index bc1c8ca57..bf7dcb383 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -124,8 +124,6 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
@BeforeClass
public void setUp() throws Exception {
Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance();
- startMetastore();
-
tmpDir = Files.createTempDir();
hourlyDataFile_1 = new File(tmpDir,
"testDB/testTopic/hourly/2020/03/17/08/data.avro");
Files.createParentDirs(hourlyDataFile_1);
@@ -236,7 +234,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
gobblinMCEWriter.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-2000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 1);
+
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 1);
// Assert low watermark and high watermark set properly
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"9");
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"20");
@@ -256,7 +254,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
gobblinMCEWriter.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"20");
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"30");
@@ -269,7 +267,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
gobblinMCEWriter.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
}
//Make sure hive test execute later and close the metastore
@@ -290,7 +288,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Iterator<org.apache.iceberg.DataFile>
result =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
filePath_1)).collect().iterator();
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 2);
+
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 2);
Assert.assertTrue(result.hasNext());
GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(),
gmce);
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(genericGmce,
@@ -313,7 +311,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
public void testChangeProperty() throws IOException {
Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 3);
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"30");
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"40");
@@ -335,7 +333,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
// Assert the offset has been updated
Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-4000");
- Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+
Assert.assertEquals(table.currentSnapshot().allManifests(table.io()).size(), 3);
// Assert low watermark and high watermark set properly
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"40");
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"45");
@@ -367,11 +365,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
1);
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
.get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
- .get("hivedb.testTopic").get(0).lowWatermark, 50L);
- Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap()
- .get(new File(tmpDir, "testDB/testTopic").getAbsolutePath())
- .get("hivedb.testTopic").get(0).highWatermark, 52L);
-
+ .get("hivedb.testTopicCompleteness").get(0).getMessage(), "failed to
flush table hivedb, testTopicCompleteness");
// No events sent yet since the topic has not been flushed
Assert.assertEquals(eventsSent.size(), 0);
@@ -381,7 +375,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Since this topic has been flushed, there should be an event sent for
previous failure, and the table
// should be removed from the error map
Assert.assertEquals(eventsSent.size(), 1);
-
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
"testTopic");
+
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.TABLE_NAME_KEY),
"testTopicCompleteness");
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_LOW_WATERMARK),
"50");
Assert.assertEquals(eventsSent.get(0).getMetadata().get(MetadataWriterKeys.GMCE_HIGH_WATERMARK),
"52");
Assert.assertEquals(gobblinMCEWriter.getDatasetErrorMap().values().iterator().next().size(),
0);
@@ -401,7 +395,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelop method
// without risking running into type cast runtime error.
gmce.setOperationType(OperationType.add_files);
- File hourlyFile = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/10/data.avro");
+ File hourlyFile = new File(tmpDir,
"testDB/testTopicCompleteness/hourly/2021/09/16/10/data.avro");
long timestampMillis = 1631811600000L;
Files.createParentDirs(hourlyFile);
writeRecord(hourlyFile);
@@ -410,25 +404,23 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
.setFileFormat("avro")
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
- gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "3000-4000").build());
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopicCompleteness-1", "3000-4000").build());
GenericRecord genericGmce_3000_4000 =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
gobblinMCEWriterWithCompletness.writeEnvelope(new
RecordEnvelope<>(genericGmce_3000_4000,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(50L))));
-
- Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-4000");
+ Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
Assert.assertTrue(table.spec().fields().size() == 2);
Assert.assertEquals(table.spec().fields().get(1).name(), "late");
// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testTopic", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis
- TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
IcebergMetadataWriter imw = (IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
imw.setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
//completeness watermark = "2020-09-16-10"
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
@@ -440,7 +432,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertTrue(dfl.hasNext());
// Test when completeness watermark is still "2021-09-16-10" but have a
late file for "2021-09-16-09"
- File hourlyFile1 = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/09/data1.avro");
+ File hourlyFile1 = new File(tmpDir,
"testDB/testTopicCompleteness/hourly/2021/09/16/09/data1.avro");
Files.createParentDirs(hourlyFile1);
writeRecord(hourlyFile1);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -448,14 +440,14 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
.setFileFormat("avro")
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
- gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "4000-5000").build());
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopicCompleteness-1", "4000-5000").build());
GenericRecord genericGmce_4000_5000 =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
gobblinMCEWriterWithCompletness.writeEnvelope(new
RecordEnvelope<>(genericGmce_4000_5000,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(55L))));
gobblinMCEWriterWithCompletness.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile1.getAbsolutePath())).collect().iterator();
@@ -463,7 +455,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
// Test when completeness watermark will advance to "2021-09-16-11"
- File hourlyFile2 = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
+ File hourlyFile2 = new File(tmpDir,
"testDB/testTopicCompleteness/hourly/2021/09/16/11/data.avro");
long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
Files.createParentDirs(hourlyFile2);
writeRecord(hourlyFile2);
@@ -472,16 +464,16 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
.setFileFormat("avro")
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
- gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "5000-6000").build());
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopicCompleteness-1", "5000-6000").build());
GenericRecord genericGmce_5000_6000 =
GenericData.get().deepCopy(gmce.getSchema(), gmce);
gobblinMCEWriterWithCompletness.writeEnvelope(new
RecordEnvelope<>(genericGmce_5000_6000,
new KafkaStreamingExtractor.KafkaWatermark(
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));
- Mockito.when(verifier.isComplete("testTopic", timestampMillis1 -
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopicCompleteness", timestampMillis1
- TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
gobblinMCEWriterWithCompletness.flush();
- table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
// watermark 1631815200000L correspond to 2021-09-16-11 in PT
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
@@ -498,7 +490,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
ZonedDateTime expectedCWDt =
ZonedDateTime.now(ZoneId.of(DEFAULT_TIME_ZONE)).truncatedTo(ChronoUnit.HOURS);
// For quiet topics, watermark should always be beginning of current hour
long expectedWatermark = expectedCWDt.toInstant().toEpochMilli();
- File hourlyFile2 = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/11/data.avro");
+ File hourlyFile2 = new File(tmpDir,
"testDB/testTopicCompleteness/hourly/2021/09/16/11/data.avro");
gmce.setOldFilePrefixes(null);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
.setFilePath(hourlyFile2.toString())
@@ -506,7 +498,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
.setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
.build()));
gmce.setOperationType(OperationType.change_property);
- gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "6000-7000").build());
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopicCompleteness-1", "6000-7000").build());
GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(),
gmce);
gobblinMCEWriterWithCompletness.writeEnvelope(new
RecordEnvelope<>(genericGmce,
new KafkaStreamingExtractor.KafkaWatermark(
@@ -515,12 +507,12 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
// For quiet topics always check for previous hour window
- Mockito.when(verifier.isComplete("testTopic",
expectedCWDt.minusHours(1).toInstant().toEpochMilli(),
expectedWatermark)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopicCompleteness",
expectedCWDt.minusHours(1).toInstant().toEpochMilli(),
expectedWatermark)).thenReturn(true);
((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
- Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-7000");
+ Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(1));
+
Assert.assertEquals(table.properties().get("offset.range.testTopicCompleteness-1"),
"3000-7000");
Assert.assertEquals(table.spec().fields().get(1).name(), "late");
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
@@ -562,15 +554,20 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
partitionValue = "2020-03-17-00";
}
return Optional.of(new
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))
- .withDbName("hivedb").withTableName("testTopic").build());
+ .withDbName("hivedb").withTableName(table.getTableName()).build());
}
@Override
protected List<HiveTable> getTables(Path path) throws IOException {
List<HiveTable> tables = super.getTables(path);
for (HiveTable table : tables) {
- table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
- new HiveRegistrationUnit.Column("datepartition",
serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)));
- //table.setLocation(tmpDir.getAbsolutePath());
+ if (table.getTableName().equals("testTopicCompleteness")) {
+ table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(
+ new HiveRegistrationUnit.Column("datepartition",
serdeConstants.STRING_TYPE_NAME, StringUtils.EMPTY)
+ , new HiveRegistrationUnit.Column("late",
serdeConstants.INT_TYPE_NAME, StringUtils.EMPTY)));
+ } else {
+
table.setPartitionKeys(ImmutableList.<HiveRegistrationUnit.Column>of(new
HiveRegistrationUnit.Column("datepartition", serdeConstants.STRING_TYPE_NAME,
StringUtils.EMPTY)));
+ //table.setLocation(tmpDir.getAbsolutePath());
+ }
}
return tables;
}
@@ -581,6 +578,9 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
if (path.toString().contains("testFaultTolerant")) {
return Lists.newArrayList("testFaultTolerantIcebergTable");
}
+ else if (path.toString().contains("testTopicCompleteness")) {
+ return Lists.newArrayList("testTopicCompleteness");
+ }
return Lists.newArrayList("testTopic");
}
}
diff --git a/gradle/scripts/defaultBuildProperties.gradle
b/gradle/scripts/defaultBuildProperties.gradle
index 950c8780f..f7b5f9cb1 100644
--- a/gradle/scripts/defaultBuildProperties.gradle
+++ b/gradle/scripts/defaultBuildProperties.gradle
@@ -31,7 +31,7 @@ def BuildProperties BUILD_PROPERTIES = new
BuildProperties(project)
.register(new BuildProperty("gobblinFlavor", "standard", "Build flavor
(see http://gobblin.readthedocs.io/en/latest/developer-guide/GobblinModules/)"))
.register(new BuildProperty("hadoopVersion", "2.10.0", "Hadoop
dependencies version"))
.register(new BuildProperty("hiveVersion", "1.0.1-avro", "Hive
dependencies version"))
- .register(new BuildProperty("icebergVersion", "0.11.1", "Iceberg
dependencies version"))
+ .register(new BuildProperty("icebergVersion", "1.2.0", "Iceberg
dependencies version"))
.register(new BuildProperty("jdkVersion",
JavaVersion.VERSION_1_8.toString(),
"Java languange compatibility; supported versions: " +
JavaVersion.VERSION_1_8))
.register(new BuildProperty("kafka08Version", "0.8.2.2", "Kafka 0.8
dependencies version"))
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 95f8f6c32..9c34ef028 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -82,6 +82,7 @@ ext.externalDependency = [
"httpcore": "org.apache.httpcomponents:httpcore:4.4.11",
"httpasyncclient": "org.apache.httpcomponents:httpasyncclient:4.1.3",
"icebergHive": "org.apache.iceberg:iceberg-hive-runtime:" + icebergVersion,
+ "icebergHiveMetastoreTest": "org.apache.iceberg:iceberg-hive-metastore:" +
icebergVersion + ":tests",
"jgit": "org.eclipse.jgit:org.eclipse.jgit:5.1.1.201809181055-r",
"jmh": "org.openjdk.jmh:jmh-core:1.17.3",
"jmhAnnotations": "org.openjdk.jmh:jmh-generator-annprocess:1.17.3",