[ 
https://issues.apache.org/jira/browse/GOBBLIN-1413?focusedWorklogId=572955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-572955
 ]

ASF GitHub Bot logged work on GOBBLIN-1413:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Mar/21 00:31
            Start Date: 27/Mar/21 00:31
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r602639737



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends 
WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification 
to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the 
DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException 
{

Review comment:
       This method will end up with list all files under the dataset, and only 
pick up the oldest one. And the oldest file might be removed by retention 
pipeline in a short time. Can you modify the algorithm to get the latest 
available data file?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -198,6 +198,15 @@ private boolean verifyInput(Map<Path, Metrics> newFiles, 
List<String> oldFiles,
         }
         break;
       }
+      case change_property: {
+        if(newFiles != null || oldFiles != null) {
+          log.warn("{} new files and {} old files detected while no file 
alteration is performed",

Review comment:
       Since we always set new files to a dummy file, do you think we need to 
modify the condition for the warn log?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -229,7 +229,7 @@ private Long getCurrentWaterMark(TableIdentifier tid, 
String topicPartition) {
    * The logic of this function will be:
    * 1. Check whether a table exists, if not then create the iceberg table
    * 2. Compute schema from the gmce and update the cache for candidate schemas
-   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or 
dropFile
+   * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or 
dropFile. change_property means no operation needed

Review comment:
       change_property means we only update the table level property but not 
modify data?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends 
WorkUnitState> states) throws IOExc
     return newFiles;
   }
 
+  /**
+   * Choose one file from the work unit state. There will be no modification 
to the file.
+   * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the 
DB and table name.
+   * @throws IOException
+   */
+  private Map<Path, Metrics> computeDummyFile (State state) throws IOException 
{
+    Map<Path, Metrics> newFiles = new HashMap<>();
+    NameMapping mapping = getNameMapping();

Review comment:
       In this case, we don't need file metrics as it's just a dummy file. Also 
get nameMapping require the latest schema is set, but it's possible that we 
don't have this properties when no data written out.

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -275,6 +275,14 @@ public void write(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<HiveSp
         dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
         break;
       }
+      case change_property: {

Review comment:
       On Line 244, we may also want to skip processing the event if the 
operation type is change_property and table does not exist?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -92,9 +93,12 @@ public void publishData(Collection<? extends WorkUnitState> 
states) throws IOExc
       Map<Path, Metrics> newFiles = computeFileMetrics(state);
       Map<String, String> offsetRange = 
getPartitionOffsetRange(OFFSET_RANGE_KEY);
       if (newFiles.isEmpty()) {
-        return;
+        // There'll be only one dummy file here. This file is parsed for DB 
and table name calculation.
+        newFiles = computeDummyFile(state);
+        this.producer.sendGMCE(newFiles, null, null, offsetRange, 
OperationType.change_property, SchemaSource.SCHEMAREGISTRY);

Review comment:
       You may want to set the schema source to be none in this case? since we 
don't want to update the schema?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 572955)
    Time Spent: 20m  (was: 10m)

> Emit GMCE as long as watermark moved
> ------------------------------------
>
>                 Key: GOBBLIN-1413
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1413
>             Project: Apache Gobblin
>          Issue Type: Bug
>            Reporter: Hanghang Liu
>            Priority: Critical
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Emit GMCE(Gobblin Metadata Change Event) as long as watermark moved on 
> streaming pipeline. 
> Currently the GMCE won't be triggered within streaming pipeline if no new 
> file being generated. This causes problem if watermarks moved, while no file 
> being generated(for example, data been filtered out by quality checker), GMCE 
> will be missed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to