This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e4d6c1b [GOBBLIN-1443] Make iceberg metadata root location include db
name
e4d6c1b is described below
commit e4d6c1b7542f3af5237447021ff14295d2c425ff
Author: Zihan Li <[email protected]>
AuthorDate: Sun May 16 08:31:54 2021 -0700
[GOBBLIN-1443] Make iceberg metadata root location include db name
Closes #3279 from ZihanLi58/GOBBLIN-1443
---
.../org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java | 8 ++++++--
.../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java | 4 ++--
.../apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java | 5 ++++-
3 files changed, 12 insertions(+), 5 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index a5d0c72..0871fb7 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -96,7 +96,9 @@ public class GobblinMCEPublisher extends DataPublisher {
if (newFiles.isEmpty()) {
// There'll be only one dummy file here. This file is parsed for DB
and table name calculation.
newFiles = computeDummyFile(state);
- this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.NONE);
+ if (!newFiles.isEmpty()) {
+ this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.NONE);
+ }
} else {
this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
}
@@ -150,7 +152,9 @@ public class GobblinMCEPublisher extends DataPublisher {
//
PriorityQueue<FileStatus> fileStatuses =
new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(),
x.getModificationTime()));
- fileStatuses.add(fs.getFileStatus(path));
+ if (fs.exists(path)) {
+ fileStatuses.add(fs.getFileStatus(path));
+ }
// Only register files
while (!fileStatuses.isEmpty()) {
FileStatus fileStatus = fileStatuses.poll();
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 02fbb2e..75cbb07 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -120,7 +120,7 @@ import org.joda.time.format.PeriodFormatterBuilder;
public class IcebergMetadataWriter implements MetadataWriter {
public static final String USE_DATA_PATH_AS_TABLE_LOCATION =
"use.data.path.as.table.location";
- public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata";
+ public static final String TABLE_LOCATION_SUFFIX = "/_iceberg_metadata/%s";
public static final String GMCE_HIGH_WATERMARK_KEY =
"gmce.high.watermark.%s";
public static final String GMCE_LOW_WATERMARK_KEY = "gmce.low.watermark.%s";
private final static String EXPIRE_SNAPSHOTS_LOOKBACK_TIME =
"gobblin.iceberg.dataset.expire.snapshots.lookBackTime";
@@ -409,7 +409,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Table icebergTable = null;
String tableLocation = null;
if (useDataLoacationAsTableLocation) {
- tableLocation = gmce.getDatasetIdentifier().getNativeName() +
TABLE_LOCATION_SUFFIX;
+ tableLocation = gmce.getDatasetIdentifier().getNativeName() +
String.format(TABLE_LOCATION_SUFFIX, table.getDbName());
//Set the path permission
Path tablePath = new Path(tableLocation);
WriterUtils.mkdirsWithRecursivePermission(tablePath.getFileSystem(conf),
tablePath, permission);
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index cb1c32c..d6ec191 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -117,7 +117,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
.setDatasetIdentifier(DatasetIdentifier.newBuilder()
.setDataOrigin(DataOrigin.EI)
.setDataPlatformUrn("urn:li:dataPlatform:hdfs")
- .setNativeName("/data/tracking/testIcebergTable")
+ .setNativeName(new File(tmpDir,
"data/tracking/testIcebergTable").getAbsolutePath())
.build())
.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "0-1000").build())
.setFlowId("testFlow")
@@ -138,6 +138,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
state.setProp("default.hive.registration.policy",
TestHiveRegistrationPolicyForIceberg.class.getName());
+ state.setProp("use.data.path.as.table.location", true);
gobblinMCEWriter = new GobblinMCEWriter(new GobblinMCEWriterBuilder(),
state);
((IcebergMetadataWriter)
gobblinMCEWriter.getMetadataWriters().iterator().next()).setCatalog(
HiveMetastoreTest.catalog);
@@ -154,6 +155,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(catalog.listTables(Namespace.of(dbName)).size(), 1);
Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertFalse(table.properties().containsKey("offset.range.testTopic-1"));
+ Assert.assertEquals(table.location(),
+ new File(tmpDir,
"data/tracking/testIcebergTable/_iceberg_metadata/").getAbsolutePath() + "/" +
dbName);
gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "1000-2000").build());
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(