pramodbiligiri commented on code in PR #6665:
URL: https://github.com/apache/hudi/pull/6665#discussion_r979584172


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java:
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers.gcs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import java.io.IOException;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
+import static 
org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
+
+/**
+ * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset 
as input.
+ * Optionally:
+ * i) Match the filename and path against provided input filter strings
+ * ii) Check if each file exists on GCS, in which case it assumes SparkContext 
is already
+ * configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
+ */
+public class FilePathsFetcher implements Serializable {
+
+  /**
+   * The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given.
+   */
+  private final String fileFormat;
+  private final TypedProperties props;
+
+  private static final String GCS_PREFIX = "gs://";
+  private static final long serialVersionUID = 1L;
+
+  private static final Logger LOG = 
LogManager.getLogger(FilePathsFetcher.class);
+
+  /**
+   * @param fileFormat The default file format to assume if {@link 
GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION}
+   *                   is not given.
+   */
+  public FilePathsFetcher(TypedProperties props, String fileFormat) {
+    this.props = props;
+    this.fileFormat = fileFormat;
+  }
+
+  /**
+   * @param sourceForFilenames a Dataset that contains metadata about files on 
GCS. Assumed to be a persisted form
+   *                           of a Cloud Storage Pubsub Notification event.
+   * @param checkIfExists      Check if each file exists, before returning its 
full path
+   * @return A list of fully qualifieed GCS file paths.
+   */
+  public List<String> getGcsFilePaths(JavaSparkContext jsc, Dataset<Row> 
sourceForFilenames, boolean checkIfExists) {
+    String filter = createFilter();
+    LOG.info("Adding filter string to Dataset: " + filter);
+
+    SerializableConfiguration serializableConfiguration = new 
SerializableConfiguration(
+            jsc.hadoopConfiguration());
+
+    return sourceForFilenames
+            .filter(filter)
+            .select("bucket", "name")
+            .distinct()
+            .rdd().toJavaRDD().mapPartitions(
+                    getCloudFilesPerPartition(serializableConfiguration, 
checkIfExists)
+            ).collect();
+  }
+
+  private FlatMapFunction<Iterator<Row>, String> getCloudFilesPerPartition(
+          SerializableConfiguration serializableConfiguration, boolean 
checkIfExists) {
+
+    return rows -> {
+      List<String> cloudFilesPerPartition = new ArrayList<>();
+      rows.forEachRemaining(row -> {
+        addFileToList(row, cloudFilesPerPartition, serializableConfiguration, 
checkIfExists);
+      });
+
+      return cloudFilesPerPartition.iterator();
+    };
+  }
+
+  private void addFileToList(Row row, List<String> cloudFilesPerPartition,
+                             SerializableConfiguration 
serializableConfiguration, boolean checkIfExists) {
+    final Configuration configuration = serializableConfiguration.newCopy();
+
+    String bucket = row.getString(0);
+    String filePath = GCS_PREFIX + bucket + "/" + row.getString(1);
+
+    try {
+      addCloudFile(GCS_PREFIX, bucket, filePath, cloudFilesPerPartition, 
configuration, checkIfExists);
+    } catch (Exception exception) {
+      LOG.warn(String.format("Failed to add cloud file %s", filePath), 
exception);
+      throw new HoodieException(String.format("Failed to add cloud file %s", 
filePath), exception);
+    }
+  }
+
+  private void addCloudFile(String gcsPrefix, String bucket, String filePath,

Review Comment:
   Moving these to CloudObjectsSelector. Hoping the changes work after I build 
:D



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to