ZihanLi58 commented on a change in pull request #3385:
URL: https://github.com/apache/gobblin/pull/3385#discussion_r702103057



##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -148,6 +158,28 @@
   /* one of the fields in DataFile entry to describe the location URI of a 
data file with FS Scheme */
   private static final String ICEBERG_FILE_PATH_COLUMN = 
DataFile.FILE_PATH.name();
 
+  public static final String ICEBERG_COMPLETENESS_ENABLED = 
"iceberg.completeness.enabled";
+  private static final boolean DEFAULT_ICEBERG_COMPLETENESS = false;
+  private final boolean completenessEnabled;

Review comment:
       Can we separate the field definition and constant value definition?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -616,23 +689,75 @@ protected void addFiles(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<
   /**
    * Method to get dataFiles with metrics information
    * This method is used to get files to be added to iceberg
+   * if completeness is enabled a new field (late) is added to table schema 
and partition spec
+   * computed based on datepartition and completion watermark
    * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} 
to get DataFile for specific file path
    */
-  private Set<DataFile> 
getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+  private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, 
TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, 
List<org.apache.gobblin.metadata.DataFile> files,
       PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> 
newSpecsMap, Map<Integer, Integer> schemaIdMap) {
     Set<DataFile> dataFiles = new HashSet<>();
     for (org.apache.gobblin.metadata.DataFile file : files) {
       try {
-        StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString()),
-            file.getFilePath(), partitionSpec);
-        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
partitionSpec, partition, conf, schemaIdMap));
+        Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString());
+        StructLike partition = getIcebergPartitionVal(hiveSpecs, 
file.getFilePath(), partitionSpec);
+
+        if(tableMetadata.completenessEnabled && gmce.getOperationType() == 
OperationType.add_files) {

Review comment:
       We only have late partition for hourly data? and for daily data, we 
won't have this partition? Will this cause some problem?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -616,23 +689,75 @@ protected void addFiles(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<
   /**
    * Method to get dataFiles with metrics information
    * This method is used to get files to be added to iceberg
+   * if completeness is enabled a new field (late) is added to table schema 
and partition spec
+   * computed based on datepartition and completion watermark
    * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} 
to get DataFile for specific file path
    */
-  private Set<DataFile> 
getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+  private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, 
TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, 
List<org.apache.gobblin.metadata.DataFile> files,
       PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> 
newSpecsMap, Map<Integer, Integer> schemaIdMap) {
     Set<DataFile> dataFiles = new HashSet<>();
     for (org.apache.gobblin.metadata.DataFile file : files) {
       try {
-        StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString()),
-            file.getFilePath(), partitionSpec);
-        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
partitionSpec, partition, conf, schemaIdMap));
+        Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString());
+        StructLike partition = getIcebergPartitionVal(hiveSpecs, 
file.getFilePath(), partitionSpec);
+
+        if(tableMetadata.completenessEnabled && gmce.getOperationType() == 
OperationType.add_files) {
+          // Assumes first partition value to be partitioned by date
+          // TODO Find better way to determine a partition value
+          String datepartition = partition.get(0, null);
+          partition = addLatePartitionValueToIcebergTable(table, tableMetadata,
+              hiveSpecs.iterator().next().getPartition().get(), datepartition);
+          
tableMetadata.datePartitions.add(getEpochMillisFromDatepartitionString(datepartition));
+        }
+        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
table.spec(), partition, conf, schemaIdMap));
       } catch (Exception e) {
         log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), 
e);
       }
     }
     return dataFiles;
   }
 
+  /**
+   * 1. Add "late" partition column to iceberg table if not exists
+   * 2. compute "late" partition value based on datepartition and completion 
watermark
+   * @param table
+   * @param tableMetadata
+   * @param hivePartition
+   * @param datepartition
+   * @return new iceberg partition value for file
+   */
+  private StructLike addLatePartitionValueToIcebergTable(Table table, 
TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) 
{
+    table = addPartitionToIcebergTable(table, newPartitionColumn, 
newPartitionColumnType);

Review comment:
       Why we call this again? shouldn't this be handled at very beginning of 
the write?
   

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -616,23 +689,75 @@ protected void addFiles(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<
   /**
    * Method to get dataFiles with metrics information
    * This method is used to get files to be added to iceberg
+   * if completeness is enabled a new field (late) is added to table schema 
and partition spec
+   * computed based on datepartition and completion watermark
    * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} 
to get DataFile for specific file path
    */
-  private Set<DataFile> 
getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+  private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, 
TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, 
List<org.apache.gobblin.metadata.DataFile> files,
       PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> 
newSpecsMap, Map<Integer, Integer> schemaIdMap) {
     Set<DataFile> dataFiles = new HashSet<>();
     for (org.apache.gobblin.metadata.DataFile file : files) {
       try {
-        StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString()),
-            file.getFilePath(), partitionSpec);
-        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
partitionSpec, partition, conf, schemaIdMap));
+        Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString());
+        StructLike partition = getIcebergPartitionVal(hiveSpecs, 
file.getFilePath(), partitionSpec);
+
+        if(tableMetadata.completenessEnabled && gmce.getOperationType() == 
OperationType.add_files) {
+          // Assumes first partition value to be partitioned by date
+          // TODO Find better way to determine a partition value
+          String datepartition = partition.get(0, null);
+          partition = addLatePartitionValueToIcebergTable(table, tableMetadata,
+              hiveSpecs.iterator().next().getPartition().get(), datepartition);
+          
tableMetadata.datePartitions.add(getEpochMillisFromDatepartitionString(datepartition));
+        }
+        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
table.spec(), partition, conf, schemaIdMap));
       } catch (Exception e) {
         log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), 
e);
       }
     }
     return dataFiles;
   }
 
+  /**
+   * 1. Add "late" partition column to iceberg table if not exists
+   * 2. compute "late" partition value based on datepartition and completion 
watermark
+   * @param table
+   * @param tableMetadata
+   * @param hivePartition
+   * @param datepartition
+   * @return new iceberg partition value for file
+   */
+  private StructLike addLatePartitionValueToIcebergTable(Table table, 
TableMetadata tableMetadata, HivePartition hivePartition, String datepartition) 
{

Review comment:
       Can we move these functions to Util class?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -616,23 +689,75 @@ protected void addFiles(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<
   /**
    * Method to get dataFiles with metrics information
    * This method is used to get files to be added to iceberg
+   * if completeness is enabled a new field (late) is added to table schema 
and partition spec
+   * computed based on datepartition and completion watermark
    * This method will call method {IcebergUtils.getIcebergDataFileWithMetric} 
to get DataFile for specific file path
    */
-  private Set<DataFile> 
getIcebergDataFilesToBeAdded(List<org.apache.gobblin.metadata.DataFile> files,
+  private Set<DataFile> getIcebergDataFilesToBeAdded(Table table, 
TableMetadata tableMetadata, GobblinMetadataChangeEvent gmce, 
List<org.apache.gobblin.metadata.DataFile> files,
       PartitionSpec partitionSpec, Map<String, Collection<HiveSpec>> 
newSpecsMap, Map<Integer, Integer> schemaIdMap) {
     Set<DataFile> dataFiles = new HashSet<>();
     for (org.apache.gobblin.metadata.DataFile file : files) {
       try {
-        StructLike partition = getIcebergPartitionVal(newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString()),
-            file.getFilePath(), partitionSpec);
-        dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
partitionSpec, partition, conf, schemaIdMap));
+        Collection<HiveSpec> hiveSpecs = newSpecsMap.get(new 
Path(file.getFilePath()).getParent().toString());

Review comment:
       For this part, I feel we have many assumptions, i.e. we only have one 
partition column, and it's datepartition. But it's not always hold true and 
GMIP was supposed to support all generic use cases. Do you think it's make 
sense to have another icebergWriterWithCompletenessCheck for this new feature?

##########
File path: 
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
##########
@@ -265,6 +313,9 @@ public void write(GobblinMetadataChangeEvent gmce, 
Map<String, Collection<HiveSp
           return;
         }
         table = createTable(gmce, tableSpec);
+        if(tableMetadata.completenessEnabled) {

Review comment:
       Add this part means we only support completeness check for new 
onboarding table, are we meaning to do that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to