This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 1a91c42f [GOBBLIN-1413] Change GMCE publisher to produce GMCE when no
file generated. Change IcebergMetadataWriter to correctly update watermark.
1a91c42f is described below
commit 1a91c42f19f6d629fb61a77fc234274c42fc2770
Author: hanghangliu <[email protected]>
AuthorDate: Mon Apr 12 14:12:15 2021 -0700
[GOBBLIN-1413] Change GMCE publisher to produce GMCE when no file
generated. Change IcebergMetadataWriter to correctly update watermark.
add more comments
address comments
choose the latest file when compute dummy file for
GMCE
fix code format to comply with code style
Closes #3252 from hanghangliu/GOBBLIN-1413-Emit-
GMCE-as-long-as-watermark-moved
---
.../apache/gobblin/iceberg/GobblinMCEProducer.java | 50 ++++++++++++++++-----
.../iceberg/publisher/GobblinMCEPublisher.java | 52 ++++++++++++++++++----
.../iceberg/writer/IcebergMetadataWriter.java | 16 +++++--
.../iceberg/publisher/GobblinMCEPublisherTest.java | 40 +++++++++++++++++
.../iceberg/writer/IcebergMetadataWriterTest.java | 36 ++++++++++++++-
.../src/main/avro/GobblinMetadataChangeEvent.avsc | 3 +-
6 files changed, 171 insertions(+), 26 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 76b276a..d77a4cb 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -198,6 +198,13 @@ public abstract class GobblinMCEProducer implements
Closeable {
}
break;
}
+ case change_property: {
+ if(oldFiles != null) {
+ log.warn("{} old files detected while no file alteration is
performed", oldFiles.size());
+ }
+ log.info("Setting GMCE while no file changes need to be performed.");
+ break;
+ }
default: {
//unsupported operation
log.error("Unsupported operation type {}", operationType);
@@ -209,18 +216,37 @@ public abstract class GobblinMCEProducer implements
Closeable {
private List<DataFile> toGobblinDataFileList(Map<Path, Metrics> files) {
- return Lists.newArrayList(Iterables.transform(files.entrySet(), file ->
DataFile.newBuilder()
- .setFilePath(file.getKey().toString())
- .setFileFormat(IcebergUtils.getIcebergFormat(state).toString())
- .setFileMetrics(DataMetrics.newBuilder()
- .setRecordCount(file.getValue().recordCount())
-
.setColumnSizes(getIntegerLongPairsFromMap(file.getValue().columnSizes()))
-
.setValueCounts(getIntegerLongPairsFromMap(file.getValue().valueCounts()))
-
.setNullValueCounts(getIntegerLongPairsFromMap(file.getValue().nullValueCounts()))
-
.setLowerBounds(getIntegerBytesPairsFromMap(file.getValue().lowerBounds()))
-
.setUpperBounds(getIntegerBytesPairsFromMap(file.getValue().upperBounds()))
- .build())
- .build()));
+ return Lists.newArrayList(Iterables.transform(files.entrySet(), file ->
+ {
+ DataFile.Builder builder = createBuilderWithFilePath(file.getKey());
+ addMetricsToFileBuilder(builder, file.getValue());
+ return builder.build();
+ }
+ ));
+ }
+
+ private DataFile.Builder createBuilderWithFilePath(Path filePath) {
+ return DataFile.newBuilder()
+ .setFilePath(filePath.toString())
+ .setFileFormat(IcebergUtils.getIcebergFormat(state).toString());
+ }
+
+ private void addMetricsToFileBuilder(DataFile.Builder builder, Metrics
metrics) {
+ // If metrics is null or empty, set FileMetrics a dummy one
+ if(metrics == null || metrics.recordCount() == null) {
+ builder.setFileMetrics(DataMetrics.newBuilder().setRecordCount(0)
+ .build());
+ return;
+ }
+ // If metrics is concrete, fill all fields
+ builder.setFileMetrics(DataMetrics.newBuilder()
+ .setRecordCount(metrics.recordCount())
+ .setColumnSizes(getIntegerLongPairsFromMap(metrics.columnSizes()))
+ .setValueCounts(getIntegerLongPairsFromMap(metrics.valueCounts()))
+
.setNullValueCounts(getIntegerLongPairsFromMap(metrics.nullValueCounts()))
+ .setLowerBounds(getIntegerBytesPairsFromMap(metrics.lowerBounds()))
+ .setUpperBounds(getIntegerBytesPairsFromMap(metrics.upperBounds()))
+ .build());
}
private List<IntegerLongPair> getIntegerLongPairsFromMap(Map<Integer, Long>
map) {
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index d3e6b7f..a5d0c72 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -18,21 +18,23 @@
package org.apache.gobblin.iceberg.publisher;
import com.google.common.io.Closer;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.iceberg.GobblinMCEProducer;
-import org.apache.gobblin.iceberg.Utils.IcebergUtils;
-import org.apache.gobblin.metadata.OperationType;
-import org.apache.gobblin.metadata.SchemaSource;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.iceberg.GobblinMCEProducer;
+import org.apache.gobblin.iceberg.Utils.IcebergUtils;
+import org.apache.gobblin.iceberg.writer.GobblinMCEWriter;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
import org.apache.gobblin.publisher.DataPublisher;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
@@ -92,9 +94,12 @@ public class GobblinMCEPublisher extends DataPublisher {
Map<Path, Metrics> newFiles = computeFileMetrics(state);
Map<String, String> offsetRange =
getPartitionOffsetRange(OFFSET_RANGE_KEY);
if (newFiles.isEmpty()) {
- return;
+ // There'll be only one dummy file here. This file is parsed for DB
and table name calculation.
+ newFiles = computeDummyFile(state);
+ this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.NONE);
+ } else {
+ this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
}
- this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
}
}
@@ -132,13 +137,44 @@ public class GobblinMCEPublisher extends DataPublisher {
return newFiles;
}
+ /**
+ * Choose the latest file from the work unit state. There will be no
modification to the file.
+ * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the
DB and table name.
+ * @throws IOException
+ */
+ private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
+ Map<Path, Metrics> newFiles = new HashMap<>();
+ FileSystem fs = FileSystem.get(conf);
+ for (final String pathString :
state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
+ Path path = new Path(pathString);
+ //
+ PriorityQueue<FileStatus> fileStatuses =
+ new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(),
x.getModificationTime()));
+ fileStatuses.add(fs.getFileStatus(path));
+ // Only register files
+ while (!fileStatuses.isEmpty()) {
+ FileStatus fileStatus = fileStatuses.poll();
+ if (fileStatus.isDirectory()) {
+
fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(),
HIDDEN_FILES_FILTER)));
+ } else {
+ Path filePath = fileStatus.getPath();
+ newFiles.put(filePath, null);
+ // Only one concrete file from the path is needed
+ return newFiles;
+ }
+ }
+ }
+ return newFiles;
+ }
+
protected NameMapping getNameMapping() {
String writerSchema =
state.getProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA);
if (writerSchema == null) {
return null;
}
try {
- org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema = new
org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(writerSchema);
+ org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema =
+ new
org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(writerSchema);
Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
//This conversion is to make sure the schema has the iceberg id setup
state.setProp(AVRO_SCHEMA_WITH_ICEBERG_ID,
AvroSchemaUtil.convert(icebergSchema.asStruct()).toString());
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index e730408..02fbb2e 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -229,7 +229,8 @@ public class IcebergMetadataWriter implements
MetadataWriter {
* The logic of this function will be:
* 1. Check whether a table exists, if not then create the iceberg table
* 2. Compute schema from the gmce and update the cache for candidate schemas
- * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or
dropFile
+ * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or
dropFile. change_property means only
+ * update the table level property but no data modification.
* Note: this method only aggregate the metadata in cache without
committing. The actual commit will be done in flush method
*/
public void write(GobblinMetadataChangeEvent gmce, Map<String,
Collection<HiveSpec>> newSpecsMap,
@@ -241,8 +242,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
table = getIcebergTable(tid);
} catch (NoSuchTableException e) {
try {
- if (gmce.getOperationType() == OperationType.drop_files) {
- log.warn("Table {} does not exist, skip processing this drop_file
event", tid.toString());
+ if (gmce.getOperationType() == OperationType.drop_files ||
+ gmce.getOperationType() == OperationType.change_property) {
+ log.warn("Table {} does not exist, skip processing this {} event",
tid.toString(), gmce.getOperationType());
return;
}
table = createTable(gmce, tableSpec);
@@ -275,6 +277,14 @@ public class IcebergMetadataWriter implements
MetadataWriter {
dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
break;
}
+ case change_property: {
+ updateTableProperty(tableSpec, tid);
+ if (gmce.getTopicPartitionOffsetsRange() != null) {
+ mergeOffsets(gmce, tid);
+ }
+ log.info("No file operation need to be performed by Iceberg Metadata
Writer at this point.");
+ break;
+ }
default: {
log.error("unsupported operation {}",
gmce.getOperationType().toString());
return;
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index 63cdce2..c5a4e38 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -231,6 +231,35 @@ public class GobblinMCEPublisherTest {
publisher.publishData(Arrays.asList(state));
}
+ @Test (dependsOnMethods = {"testPublishGMCEForAvro"})
+ public void testPublishGMCEWithoutFile() throws IOException {
+ GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
+ Mockito.doCallRealMethod()
+ .when(producer)
+ .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(),
anyMap(), any(), any());
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+ GobblinMetadataChangeEvent gmce =
+ producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>)
args[0], null, null,
+ (Map<String, String>) args[1], OperationType.change_property,
SchemaSource.NONE);
+ Assert.assertEquals(gmce.getNewFiles().size(), 1);
+ Assert.assertNull(gmce.getOldFiles());
+ Assert.assertNull(gmce.getOldFilePrefixes());
+ Assert.assertEquals(gmce.getOperationType(),
OperationType.change_property);
+ return null;
+ }
+ }).when(producer).sendGMCE(anyMap(), anyList(), anyList(), anyMap(),
any(), any());
+
+ WorkUnitState state = new WorkUnitState();
+ setGMCEPublisherStateWithoutNewFile(state);
+ Mockito.doCallRealMethod().when(producer).setState(state);
+ producer.setState(state);
+ GobblinMCEPublisher publisher = new GobblinMCEPublisher(state, producer);
+ publisher.publishData(Arrays.asList(state));
+ }
+
private void setGMCEPublisherStateForOrcFile(WorkUnitState state) {
state.setProp(GobblinMCEPublisher.NEW_FILES_LIST, orcFilePath.toString());
state.setProp(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, "ORC");
@@ -242,6 +271,17 @@ public class GobblinMCEPublisherTest {
state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, orcSchema);
}
+ private void setGMCEPublisherStateWithoutNewFile(WorkUnitState state) {
+ //state.setProp(GobblinMCEPublisher.NEW_FILES_LIST, dataFile.toString());
+ state.setProp(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, "AVRO");
+ state.setProp(GobblinMCEPublisher.OFFSET_RANGE_KEY, "testTopic-1:0-1000");
+ state.setProp(ConfigurationKeys.HIVE_REGISTRATION_POLICY,
+ HiveSnapshotRegistrationPolicy.class.getCanonicalName());
+ state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR,
datasetDir.toString());
+ state.setProp(AbstractJob.JOB_ID, "testFlow");
+ state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA,
_avroPartitionSchema);
+ }
+
private void setGMCEPublisherStateForAvroFile(WorkUnitState state) {
state.setProp(GobblinMCEPublisher.NEW_FILES_LIST, dataFile.toString());
state.setProp(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, "AVRO");
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 88f72f9..cb1c32c 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FindFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.testng.Assert;
@@ -144,7 +145,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
}
- @Test ( priority = 1 )
+ @Test ( priority = 0 )
public void testWriteAddFileGMCE() throws IOException {
gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
new KafkaStreamingExtractor.KafkaWatermark(
@@ -197,7 +198,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
}
//Make sure hive test execute later and close the metastore
- @Test( priority = 2 )
+ @Test( priority = 1 )
public void testWriteRewriteFileGMCE() throws IOException {
gmce.setTopicPartitionOffsetsRange(null);
FileSystem fs = FileSystem.get(new Configuration());
@@ -232,6 +233,37 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertFalse(result.hasNext());
}
+ @Test( priority = 2 )
+ public void testChangeProperty() throws IOException {
+ Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-3000");
+ Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"30");
+
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"40");
+
+ gmce.setOldFilePrefixes(null);
+ DataFile dailyFile = DataFile.newBuilder()
+ .setFilePath(dailyDataFile.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(0L).build())
+ .build();
+ gmce.setNewFiles(Lists.newArrayList(dailyFile));
+ gmce.setOperationType(OperationType.change_property);
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "2000-4000").build());
+ gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(45L))));
+ gobblinMCEWriter.flush();
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ // Assert the offset has been updated
+ Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-4000");
+ Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+ // Assert low watermark and high watermark set properly
+
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
"40");
+
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
"45");
+ }
+
private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
index 2226934..aa3dd8b 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
@@ -143,7 +143,8 @@
"symbols": [
"add_files",
"drop_files",
- "rewrite_files"
+ "rewrite_files",
+ "change_property"
]
},
"doc": "This is the operation type which indicates change for the
specific files, for purger we don't need to do hive registration",