This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a91c42f [GOBBLIN-1413] Change GMCE publisher to produce GMCE when no 
file generated. Change IcebergMetadataWriter to correctly update watermark.
1a91c42f is described below

commit 1a91c42f19f6d629fb61a77fc234274c42fc2770
Author: hanghangliu <[email protected]>
AuthorDate: Mon Apr 12 14:12:15 2021 -0700

    [GOBBLIN-1413] Change GMCE publisher to produce GMCE when no file 
generated. Change IcebergMetadataWriter to correctly update watermark.
    
    add more comments
    
    address comments
    
    choose the latest file when compute dummy file for
    GMCE
    
    fix code format to comply with code style
    
    Closes #3252 from hanghangliu/GOBBLIN-1413-Emit-
    GMCE-as-long-as-watermark-moved
---
 .../apache/gobblin/iceberg/GobblinMCEProducer.java | 50 ++++++++++++++++-----
 .../iceberg/publisher/GobblinMCEPublisher.java     | 52 ++++++++++++++++++----
 .../iceberg/writer/IcebergMetadataWriter.java      | 16 +++++--
 .../iceberg/publisher/GobblinMCEPublisherTest.java | 40 +++++++++++++++++
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 36 ++++++++++++++-
 .../src/main/avro/GobblinMetadataChangeEvent.avsc  |  3 +-
 6 files changed, 171 insertions(+), 26 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
index 76b276a..d77a4cb 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
@@ -198,6 +198,13 @@ public abstract class GobblinMCEProducer implements 
Closeable {
         }
         break;
       }
+      case change_property: {
+        if(oldFiles != null) {
+          log.warn("{} old files detected while no file alteration is 
performed", oldFiles.size());
+        }
+        log.info("Setting GMCE while no file changes need to be performed.");
+        break;
+      }
       default: {
         //unsupported operation
         log.error("Unsupported operation type {}", operationType);
@@ -209,18 +216,37 @@ public abstract class GobblinMCEProducer implements 
Closeable {
 
 
   private List<DataFile> toGobblinDataFileList(Map<Path, Metrics> files) {
-    return Lists.newArrayList(Iterables.transform(files.entrySet(), file -> 
DataFile.newBuilder()
-        .setFilePath(file.getKey().toString())
-        .setFileFormat(IcebergUtils.getIcebergFormat(state).toString())
-        .setFileMetrics(DataMetrics.newBuilder()
-            .setRecordCount(file.getValue().recordCount())
-            
.setColumnSizes(getIntegerLongPairsFromMap(file.getValue().columnSizes()))
-            
.setValueCounts(getIntegerLongPairsFromMap(file.getValue().valueCounts()))
-            
.setNullValueCounts(getIntegerLongPairsFromMap(file.getValue().nullValueCounts()))
-            
.setLowerBounds(getIntegerBytesPairsFromMap(file.getValue().lowerBounds()))
-            
.setUpperBounds(getIntegerBytesPairsFromMap(file.getValue().upperBounds()))
-            .build())
-        .build()));
+    return Lists.newArrayList(Iterables.transform(files.entrySet(), file ->
+        {
+          DataFile.Builder builder = createBuilderWithFilePath(file.getKey());
+          addMetricsToFileBuilder(builder, file.getValue());
+          return builder.build();
+        }
+    ));
+  }
+
+  private DataFile.Builder createBuilderWithFilePath(Path filePath) {
+    return DataFile.newBuilder()
+        .setFilePath(filePath.toString())
+        .setFileFormat(IcebergUtils.getIcebergFormat(state).toString());
+  }
+
+  private void addMetricsToFileBuilder(DataFile.Builder builder, Metrics 
metrics) {
+    // If metrics is null or empty, set FileMetrics a dummy one
+    if(metrics == null || metrics.recordCount() == null) {
+      builder.setFileMetrics(DataMetrics.newBuilder().setRecordCount(0)
+          .build());
+      return;
+    }
+    // If metrics is concrete, fill all fields
+    builder.setFileMetrics(DataMetrics.newBuilder()
+        .setRecordCount(metrics.recordCount())
+        .setColumnSizes(getIntegerLongPairsFromMap(metrics.columnSizes()))
+        .setValueCounts(getIntegerLongPairsFromMap(metrics.valueCounts()))
+        
.setNullValueCounts(getIntegerLongPairsFromMap(metrics.nullValueCounts()))
+        .setLowerBounds(getIntegerBytesPairsFromMap(metrics.lowerBounds()))
+        .setUpperBounds(getIntegerBytesPairsFromMap(metrics.upperBounds()))
+        .build());
   }
 
   private List<IntegerLongPair> getIntegerLongPairsFromMap(Map<Integer, Long> 
map) {
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index d3e6b7f..a5d0c72 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -18,21 +18,23 @@
 package org.apache.gobblin.iceberg.publisher;
 
 import com.google.common.io.Closer;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.iceberg.GobblinMCEProducer;
-import org.apache.gobblin.iceberg.Utils.IcebergUtils;
-import org.apache.gobblin.metadata.OperationType;
-import org.apache.gobblin.metadata.SchemaSource;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.iceberg.GobblinMCEProducer;
+import org.apache.gobblin.iceberg.Utils.IcebergUtils;
+import org.apache.gobblin.iceberg.writer.GobblinMCEWriter;
+import org.apache.gobblin.metadata.OperationType;
+import org.apache.gobblin.metadata.SchemaSource;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.filters.HiddenFilter;
@@ -92,9 +94,12 @@ public class GobblinMCEPublisher extends DataPublisher {
       Map<Path, Metrics> newFiles = computeFileMetrics(state);
       Map<String, String> offsetRange = 
getPartitionOffsetRange(OFFSET_RANGE_KEY);
       if (newFiles.isEmpty()) {
-        return;
+        // There'll be only one dummy file here. This file is parsed for DB 
and table name calculation.
+        newFiles = computeDummyFile(state);
+        this.producer.sendGMCE(newFiles, null, null, offsetRange, 
OperationType.change_property, SchemaSource.NONE);
+      } else {
+        this.producer.sendGMCE(newFiles, null, null, offsetRange, 
OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
       }
-      this.producer.sendGMCE(newFiles, null, null, offsetRange, 
OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
     }
   }
 
@@ -132,13 +137,44 @@ public class GobblinMCEPublisher extends DataPublisher {
     return newFiles;
   }
 
+  /**
+   * Choose the latest file from the work unit state. There will be no 
modification to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the 
DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    FileSystem fs = FileSystem.get(conf);
+    for (final String pathString : 
state.getPropAsList(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, "")) {
+      Path path = new Path(pathString);
+      //
+      PriorityQueue<FileStatus> fileStatuses =
+          new PriorityQueue<>((x, y) -> Long.compare(y.getModificationTime(), 
x.getModificationTime()));
+      fileStatuses.add(fs.getFileStatus(path));
+      // Only register files
+      while (!fileStatuses.isEmpty()) {
+        FileStatus fileStatus = fileStatuses.poll();
+        if (fileStatus.isDirectory()) {
+          
fileStatuses.addAll(Arrays.asList(fs.listStatus(fileStatus.getPath(), 
HIDDEN_FILES_FILTER)));
+        } else {
+          Path filePath = fileStatus.getPath();
+          newFiles.put(filePath, null);
+          // Only one concrete file from the path is needed
+          return newFiles;
+        }
+      }
+    }
+    return newFiles;
+  }
+
   protected NameMapping getNameMapping() {
     String writerSchema = 
state.getProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA);
     if (writerSchema == null) {
       return null;
     }
     try {
-      org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema = new 
org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(writerSchema);
+      org.apache.iceberg.shaded.org.apache.avro.Schema avroSchema =
+          new 
org.apache.iceberg.shaded.org.apache.avro.Schema.Parser().parse(writerSchema);
       Schema icebergSchema = AvroSchemaUtil.toIceberg(avroSchema);
       //This conversion is to make sure the schema has the iceberg id setup
       state.setProp(AVRO_SCHEMA_WITH_ICEBERG_ID, 
AvroSchemaUtil.convert(icebergSchema.asStruct()).toString());
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index e730408..02fbb2e 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -229,7 +229,8 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    * The logic of this function will be:
    * 1. Check whether a table exists, if not then create the iceberg table
    * 2. Compute schema from the gmce and update the cache for candidate schemas
-   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or 
dropFile
+   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or 
dropFile. change_property means only
+   * update the table level property but no data modification.
    * Note: this method only aggregate the metadata in cache without 
committing. The actual commit will be done in flush method
    */
   public void write(GobblinMetadataChangeEvent gmce, Map<String, 
Collection<HiveSpec>> newSpecsMap,
@@ -241,8 +242,9 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
       table = getIcebergTable(tid);
     } catch (NoSuchTableException e) {
       try {
-        if (gmce.getOperationType() == OperationType.drop_files) {
-          log.warn("Table {} does not exist, skip processing this drop_file 
event", tid.toString());
+        if (gmce.getOperationType() == OperationType.drop_files ||
+            gmce.getOperationType() == OperationType.change_property) {
+          log.warn("Table {} does not exist, skip processing this {} event", 
tid.toString(), gmce.getOperationType());
           return;
         }
         table = createTable(gmce, tableSpec);
@@ -275,6 +277,14 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
         break;
       }
+      case change_property: {
+        updateTableProperty(tableSpec, tid);
+        if (gmce.getTopicPartitionOffsetsRange() != null) {
+          mergeOffsets(gmce, tid);
+        }
+        log.info("No file operation need to be performed by Iceberg Metadata 
Writer at this point.");
+        break;
+      }
       default: {
         log.error("unsupported operation {}", 
gmce.getOperationType().toString());
         return;
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
index 63cdce2..c5a4e38 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisherTest.java
@@ -231,6 +231,35 @@ public class GobblinMCEPublisherTest {
     publisher.publishData(Arrays.asList(state));
   }
 
+  @Test (dependsOnMethods = {"testPublishGMCEForAvro"})
+  public void testPublishGMCEWithoutFile() throws IOException {
+    GobblinMCEProducer producer = Mockito.mock(GobblinMCEProducer.class);
+    Mockito.doCallRealMethod()
+        .when(producer)
+        .getGobblinMetadataChangeEvent(anyMap(), anyList(), anyList(), 
anyMap(), any(), any());
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        GobblinMetadataChangeEvent gmce =
+            producer.getGobblinMetadataChangeEvent((Map<Path, Metrics>) 
args[0], null, null,
+                (Map<String, String>) args[1], OperationType.change_property, 
SchemaSource.NONE);
+        Assert.assertEquals(gmce.getNewFiles().size(), 1);
+        Assert.assertNull(gmce.getOldFiles());
+        Assert.assertNull(gmce.getOldFilePrefixes());
+        Assert.assertEquals(gmce.getOperationType(), 
OperationType.change_property);
+        return null;
+      }
+    }).when(producer).sendGMCE(anyMap(), anyList(), anyList(), anyMap(), 
any(), any());
+
+    WorkUnitState state = new WorkUnitState();
+    setGMCEPublisherStateWithoutNewFile(state);
+    Mockito.doCallRealMethod().when(producer).setState(state);
+    producer.setState(state);
+    GobblinMCEPublisher publisher = new GobblinMCEPublisher(state, producer);
+    publisher.publishData(Arrays.asList(state));
+  }
+
   private void setGMCEPublisherStateForOrcFile(WorkUnitState state) {
     state.setProp(GobblinMCEPublisher.NEW_FILES_LIST, orcFilePath.toString());
     state.setProp(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, "ORC");
@@ -242,6 +271,17 @@ public class GobblinMCEPublisherTest {
     state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, orcSchema);
   }
 
+  private void setGMCEPublisherStateWithoutNewFile(WorkUnitState state) {
+    //state.setProp(GobblinMCEPublisher.NEW_FILES_LIST, dataFile.toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, "AVRO");
+    state.setProp(GobblinMCEPublisher.OFFSET_RANGE_KEY, "testTopic-1:0-1000");
+    state.setProp(ConfigurationKeys.HIVE_REGISTRATION_POLICY,
+        HiveSnapshotRegistrationPolicy.class.getCanonicalName());
+    state.setProp(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR, 
datasetDir.toString());
+    state.setProp(AbstractJob.JOB_ID, "testFlow");
+    state.setProp(PartitionedDataWriter.WRITER_LATEST_SCHEMA, 
_avroPartitionSchema);
+  }
+
   private void setGMCEPublisherStateForAvroFile(WorkUnitState state) {
     state.setProp(GobblinMCEPublisher.NEW_FILES_LIST, dataFile.toString());
     state.setProp(ConfigurationKeys.WRITER_OUTPUT_FORMAT_KEY, "AVRO");
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 88f72f9..cb1c32c 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.FindFiles;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hive.HiveMetastoreTest;
 import org.testng.Assert;
@@ -144,7 +145,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         
SchemaBuilder.record("partitionTest").fields().name("ds").type().optional().stringType().endRecord();
   }
 
-  @Test ( priority = 1 )
+  @Test ( priority = 0 )
   public void testWriteAddFileGMCE() throws IOException {
     gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
         new KafkaStreamingExtractor.KafkaWatermark(
@@ -197,7 +198,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
   }
 
   //Make sure hive test execute later and close the metastore
-  @Test( priority = 2 )
+  @Test( priority = 1 )
   public void testWriteRewriteFileGMCE() throws IOException {
     gmce.setTopicPartitionOffsetsRange(null);
     FileSystem fs = FileSystem.get(new Configuration());
@@ -232,6 +233,37 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertFalse(result.hasNext());
   }
 
+  @Test( priority = 2 )
+  public void testChangeProperty() throws IOException {
+    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-3000");
+    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+    
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "30");
+    
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "40");
+
+    gmce.setOldFilePrefixes(null);
+    DataFile dailyFile = DataFile.newBuilder()
+        .setFilePath(dailyDataFile.toString())
+        .setFileFormat("avro")
+        .setFileMetrics(DataMetrics.newBuilder().setRecordCount(0L).build())
+        .build();
+    gmce.setNewFiles(Lists.newArrayList(dailyFile));
+    gmce.setOperationType(OperationType.change_property);
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "2000-4000").build());
+    gobblinMCEWriter.writeEnvelope(new RecordEnvelope<>(gmce,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(45L))));
+    gobblinMCEWriter.flush();
+    table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    // Assert the offset has been updated
+    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-4000");
+    Assert.assertEquals(table.currentSnapshot().allManifests().size(), 3);
+    // Assert low watermark and high watermark set properly
+    
Assert.assertEquals(table.properties().get("gmce.low.watermark.GobblinMetadataChangeEvent_test-1"),
 "40");
+    
Assert.assertEquals(table.properties().get("gmce.high.watermark.GobblinMetadataChangeEvent_test-1"),
 "45");
+  }
+
   private String writeRecord(File file) throws IOException {
     GenericData.Record record = new GenericData.Record(avroDataSchema);
     record.put("id", 1L);
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
index 2226934..aa3dd8b 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/avro/GobblinMetadataChangeEvent.avsc
@@ -143,7 +143,8 @@
           "symbols": [
             "add_files",
             "drop_files",
-            "rewrite_files"
+            "rewrite_files",
+            "change_property"
           ]
         },
         "doc": "This is the operation type which indicates change for the 
specific files, for purger we don't need to do hive registration",

Reply via email to