This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 10397ab9d Allow extension of functions in GobblinMCEPublisher and
customization of fileList file metrics are calculated for (#3820)
10397ab9d is described below
commit 10397ab9d85829b86f1a04958742e612dcff234b
Author: Andy Jiang <[email protected]>
AuthorDate: Mon Nov 6 16:15:22 2023 -0800
Allow extension of functions in GobblinMCEPublisher and customization of
fileList file metrics are calculated for (#3820)
---
.../gobblin/iceberg/publisher/GobblinMCEPublisher.java | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index ffeefaa08..eceece3b0 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.stream.Collectors;
@@ -83,7 +84,6 @@ public class GobblinMCEPublisher extends DataPublisher {
public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY =
"serializedAuditCountMap";
public GobblinMCEPublisher(State state) throws IOException {
-
this(state, GobblinMCEProducer.getGobblinMCEProducer(state));
}
@@ -96,7 +96,7 @@ public class GobblinMCEPublisher extends DataPublisher {
public void publishData(Collection<? extends WorkUnitState> states) throws
IOException {
// First aggregate the new files by partition
for (State state : states) {
- Map<Path, Metrics> newFiles = computeFileMetrics(state);
+ Map<Path, Metrics> newFiles = computeFileMetrics(state,
state.getPropAsList(NEW_FILES_LIST, ""));
Map<String, String> offsetRange =
getPartitionOffsetRange(OFFSET_RANGE_KEY);
if (newFiles.isEmpty()) {
// There'll be only one dummy file here. This file is parsed for DB
and table name calculation.
@@ -114,7 +114,7 @@ public class GobblinMCEPublisher extends DataPublisher {
}
}
- private Map<String, String> getPartitionOffsetRange(String offsetKey) {
+ protected Map<String, String> getPartitionOffsetRange(String offsetKey) {
return state.getPropAsList(offsetKey)
.stream()
.collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s ->
s.split(MAP_DELIMITER_KEY)[1]));
@@ -125,11 +125,11 @@ public class GobblinMCEPublisher extends DataPublisher {
* and calculate the hive spec for each datafile and submit the task to
register that datafile
* @throws IOException
*/
- private Map<Path, Metrics> computeFileMetrics(State state) throws
IOException {
+ protected Map<Path, Metrics> computeFileMetrics(State state, List<String>
fileList) throws IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
NameMapping mapping = getNameMapping();
FileSystem fs = FileSystem.get(conf);
- for (final String pathString : state.getPropAsList(NEW_FILES_LIST, "")) {
+ for (final String pathString : fileList) {
Path path = new Path(pathString);
LinkedList<FileStatus> fileStatuses = new LinkedList<>();
fileStatuses.add(fs.getFileStatus(path));
@@ -153,7 +153,7 @@ public class GobblinMCEPublisher extends DataPublisher {
* It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the
DB and table name.
* @throws IOException
*/
- private Map<Path, Metrics> computeDummyFile(State state) throws IOException {
+ protected Map<Path, Metrics> computeDummyFile(State state) throws
IOException {
Map<Path, Metrics> newFiles = new HashMap<>();
FileSystem fs = FileSystem.get(conf);
if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {