[
https://issues.apache.org/jira/browse/GOBBLIN-1413?focusedWorklogId=572955&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-572955
]
ASF GitHub Bot logged work on GOBBLIN-1413:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 27/Mar/21 00:31
Start Date: 27/Mar/21 00:31
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on a change in pull request #3252:
URL: https://github.com/apache/gobblin/pull/3252#discussion_r602639737
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends
WorkUnitState> states) throws IOExc
return newFiles;
}
+ /**
+ * Choose one file from the work unit state. There will be no modification
to the file.
+ * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the
DB and table name.
+ * @throws IOException
+ */
+ private Map<Path, Metrics> computeDummyFile (State state) throws IOException
{
Review comment:
This method will end up with list all files under the dataset, and only
pick up the oldest one. And the oldest file might be removed by retention
pipeline in a short time. Can you modify the algorithm to get the latest
available data file?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/GobblinMCEProducer.java
##########
@@ -198,6 +198,15 @@ private boolean verifyInput(Map<Path, Metrics> newFiles,
List<String> oldFiles,
}
break;
}
+ case change_property: {
+ if(newFiles != null || oldFiles != null) {
+ log.warn("{} new files and {} old files detected while no file
alteration is performed",
Review comment:
Since we always set new files to a dummy file, do you think we need to
modify the condition for the warn log?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -229,7 +229,7 @@ private Long getCurrentWaterMark(TableIdentifier tid,
String topicPartition) {
* The logic of this function will be:
* 1. Check whether a table exists, if not then create the iceberg table
* 2. Compute schema from the gmce and update the cache for candidate schemas
- * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or
dropFile
+ * 3. Do the required operation of the gmce, i.e. addFile, rewriteFile or
dropFile. change_property means no operation needed
Review comment:
change_property means we only update the table level property but not
modify data?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -132,6 +136,36 @@ public void publishData(Collection<? extends
WorkUnitState> states) throws IOExc
return newFiles;
}
+ /**
+ * Choose one file from the work unit state. There will be no modification
to the file.
+ * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the
DB and table name.
+ * @throws IOException
+ */
+ private Map<Path, Metrics> computeDummyFile (State state) throws IOException
{
+ Map<Path, Metrics> newFiles = new HashMap<>();
+ NameMapping mapping = getNameMapping();
Review comment:
In this case, we don't need file metrics as it's just a dummy file. Also
get nameMapping require the latest schema is set, but it's possible that we
don't have this properties when no data written out.
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -275,6 +275,14 @@ public void write(GobblinMetadataChangeEvent gmce,
Map<String, Collection<HiveSp
dropFiles(gmce, oldSpecsMap, table, tableMetadata, tid);
break;
}
+ case change_property: {
Review comment:
On Line 244, we may also want to skip processing the event if the
operation type is change_property and table does not exist?
##########
File path:
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
##########
@@ -92,9 +93,12 @@ public void publishData(Collection<? extends WorkUnitState>
states) throws IOExc
Map<Path, Metrics> newFiles = computeFileMetrics(state);
Map<String, String> offsetRange =
getPartitionOffsetRange(OFFSET_RANGE_KEY);
if (newFiles.isEmpty()) {
- return;
+ // There'll be only one dummy file here. This file is parsed for DB
and table name calculation.
+ newFiles = computeDummyFile(state);
+ this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.SCHEMAREGISTRY);
Review comment:
You may want to set the schema source to be none in this case? since we
don't want to update the schema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 572955)
Time Spent: 20m (was: 10m)
> Emit GMCE as long as watermark moved
> ------------------------------------
>
> Key: GOBBLIN-1413
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1413
> Project: Apache Gobblin
> Issue Type: Bug
> Reporter: Hanghang Liu
> Priority: Critical
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Emit GMCE(Gobblin Metadata Change Event) as long as watermark moved on
> streaming pipeline.
> Currently the GMCE won't be triggered within streaming pipeline if no new
> file being generated. This causes problem if watermarks moved, while no file
> being generated(for example, data been filtered out by quality checker), GMCE
> will be missed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)