This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 10397ab9d Allow extension of functions in GobblinMCEPublisher and 
customization of fileList file metrics are calculated for (#3820)
10397ab9d is described below

commit 10397ab9d85829b86f1a04958742e612dcff234b
Author: Andy Jiang <[email protected]>
AuthorDate: Mon Nov 6 16:15:22 2023 -0800

    Allow extension of functions in GobblinMCEPublisher and customization of 
fileList file metrics are calculated for (#3820)
---
 .../gobblin/iceberg/publisher/GobblinMCEPublisher.java       | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index ffeefaa08..eceece3b0 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.stream.Collectors;
@@ -83,7 +84,6 @@ public class GobblinMCEPublisher extends DataPublisher {
   public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY = 
"serializedAuditCountMap";
 
   public GobblinMCEPublisher(State state) throws IOException {
-
     this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
   }
 
@@ -96,7 +96,7 @@ public class GobblinMCEPublisher extends DataPublisher {
   public void publishData(Collection<? extends WorkUnitState> states) throws 
IOException {
     // First aggregate the new files by partition
     for (State state : states) {
-      Map<Path, Metrics> newFiles = computeFileMetrics(state);
+      Map<Path, Metrics> newFiles = computeFileMetrics(state, 
state.getPropAsList(NEW_FILES_LIST, ""));
       Map<String, String> offsetRange = 
getPartitionOffsetRange(OFFSET_RANGE_KEY);
       if (newFiles.isEmpty()) {
         // There'll be only one dummy file here. This file is parsed for DB 
and table name calculation.
@@ -114,7 +114,7 @@ public class GobblinMCEPublisher extends DataPublisher {
     }
   }
 
-  private Map<String, String> getPartitionOffsetRange(String offsetKey) {
+  protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
     return state.getPropAsList(offsetKey)
         .stream()
         .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> 
s.split(MAP_DELIMITER_KEY)[1]));
@@ -125,11 +125,11 @@ public class GobblinMCEPublisher extends DataPublisher {
    * and calculate the hive spec for each datafile and submit the task to 
register that datafile
    * @throws IOException
    */
-  private Map<Path, Metrics> computeFileMetrics(State state) throws 
IOException {
+  protected Map<Path, Metrics> computeFileMetrics(State state, List<String> 
fileList) throws IOException {
     Map<Path, Metrics> newFiles = new HashMap<>();
     NameMapping mapping = getNameMapping();
     FileSystem fs = FileSystem.get(conf);
-    for (final String pathString : state.getPropAsList(NEW_FILES_LIST, "")) {
+    for (final String pathString : fileList) {
       Path path = new Path(pathString);
       LinkedList<FileStatus> fileStatuses = new LinkedList<>();
       fileStatuses.add(fs.getFileStatus(path));
@@ -153,7 +153,7 @@ public class GobblinMCEPublisher extends DataPublisher {
    * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the 
DB and table name.
    * @throws IOException
    */
-  private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
+  protected Map<Path, Metrics> computeDummyFile(State state) throws 
IOException {
     Map<Path, Metrics> newFiles = new HashMap<>();
     FileSystem fs = FileSystem.get(conf);
     if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {

Reply via email to