pramodbiligiri commented on code in PR #6665: URL: https://github.com/apache/hudi/pull/6665#discussion_r979584172
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/FilePathsFetcher.java: ########## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers.gcs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import java.io.IOException; +import java.io.Serializable; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.SELECT_RELATIVE_PATH_PREFIX; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX; +import static org.apache.hudi.utilities.sources.helpers.gcs.GcsIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR; + +/** + * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset as input. + * Optionally: + * i) Match the filename and path against provided input filter strings + * ii) Check if each file exists on GCS, in which case it assumes SparkContext is already + * configured with GCS options through GcsEventsHoodieIncrSource.addGcsAccessConfs(). + */ +public class FilePathsFetcher implements Serializable { + + /** + * The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} is not given. + */ + private final String fileFormat; + private final TypedProperties props; + + private static final String GCS_PREFIX = "gs://"; + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LogManager.getLogger(FilePathsFetcher.class); + + /** + * @param fileFormat The default file format to assume if {@link GcsIngestionConfig.GCS_INCR_DATAFILE_EXTENSION} + * is not given. + */ + public FilePathsFetcher(TypedProperties props, String fileFormat) { + this.props = props; + this.fileFormat = fileFormat; + } + + /** + * @param sourceForFilenames a Dataset that contains metadata about files on GCS. Assumed to be a persisted form + * of a Cloud Storage Pubsub Notification event. + * @param checkIfExists Check if each file exists, before returning its full path + * @return A list of fully qualifieed GCS file paths. + */ + public List<String> getGcsFilePaths(JavaSparkContext jsc, Dataset<Row> sourceForFilenames, boolean checkIfExists) { + String filter = createFilter(); + LOG.info("Adding filter string to Dataset: " + filter); + + SerializableConfiguration serializableConfiguration = new SerializableConfiguration( + jsc.hadoopConfiguration()); + + return sourceForFilenames + .filter(filter) + .select("bucket", "name") + .distinct() + .rdd().toJavaRDD().mapPartitions( + getCloudFilesPerPartition(serializableConfiguration, checkIfExists) + ).collect(); + } + + private FlatMapFunction<Iterator<Row>, String> getCloudFilesPerPartition( + SerializableConfiguration serializableConfiguration, boolean checkIfExists) { + + return rows -> { + List<String> cloudFilesPerPartition = new ArrayList<>(); + rows.forEachRemaining(row -> { + addFileToList(row, cloudFilesPerPartition, serializableConfiguration, checkIfExists); + }); + + return cloudFilesPerPartition.iterator(); + }; + } + + private void addFileToList(Row row, List<String> cloudFilesPerPartition, + SerializableConfiguration serializableConfiguration, boolean checkIfExists) { + final Configuration configuration = serializableConfiguration.newCopy(); + + String bucket = row.getString(0); + String filePath = GCS_PREFIX + bucket + "/" + row.getString(1); + + try { + addCloudFile(GCS_PREFIX, bucket, filePath, cloudFilesPerPartition, configuration, checkIfExists); + } catch (Exception exception) { + LOG.warn(String.format("Failed to add cloud file %s", filePath), exception); + throw new HoodieException(String.format("Failed to add cloud file %s", filePath), exception); + } + } + + private void addCloudFile(String gcsPrefix, String bucket, String filePath, Review Comment: Moving these to CloudObjectsSelector. Hoping the changes work after I build :D -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
