pramodbiligiri commented on code in PR #6665: URL: https://github.com/apache/hudi/pull/6665#discussion_r980763426
########## hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java: ########## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources.helpers; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Row; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Generic helper methods to fetch from Cloud Storage during incremental fetch from cloud storage buckets. + * NOTE: DO NOT use any implementation specific classes here. This class is supposed to across S3EventsSource, + * GcsEventsSource etc...so you can't assume the classes for your specific implementation will be available here. + */ +public class CloudObjectsSelectorCommon { + + private static final Logger LOG = LogManager.getLogger(CloudObjectsSelectorCommon.class); + + /** + * Return a function that extracts filepaths from a list of Rows. + * Here Row is assumed to have the schema [bucket_name, filepath_relative_to_bucket] + * @param storageUrlSchemePrefix Eg: s3:// or gs://. The storage-provider-specific prefix to use within the URL. + * @param serializableConfiguration + * @param checkIfExists check if each file exists, before adding it to the returned list + * @return + */ + public static FlatMapFunction<Iterator<Row>, String> getCloudFilesPerPartition( + String storageUrlSchemePrefix, SerializableConfiguration serializableConfiguration, boolean checkIfExists) { + return rows -> { + List<String> cloudFilesPerPartition = new ArrayList<>(); + rows.forEachRemaining(row -> { + addFileToList(row, cloudFilesPerPartition, storageUrlSchemePrefix, serializableConfiguration, checkIfExists); + }); + + return cloudFilesPerPartition.iterator(); + }; + } + + /** + * Extract a filepath from a given Row, and add it to a list. Optionally check if the file exists. + * Here Row is assumed to have the schema [bucket_name, filepath_relative_to_bucket] + * @param storageUrlSchemePrefix Eg: s3:// or gs://. The storage-provider-specific prefix to use within the URL. + */ + private static void addFileToList(Row row, List<String> filePathUrls, String storageUrlSchemePrefix, Review Comment: Didn't understand the question. What do mean by return the output file - a Java File object? The filePathUrls will be of the form gs://bucket-name/folder-name/file.json. At most this function checks if the file exists, for which it needs the gcs-connector type libs on the classpath. There's a separate part of the code which fetches the data of the file. I'm updating the Javadoc of that function to be like below. Do you think this is sufficient? /** * Construct a fully qualified file path from a given Row, and add it to a list. Optionally check if the file exists. * The checkIfExists part assumes the relevant impl classes for the storageUrlSchemePrefix are already present * on the classpath! * Here Row is assumed to have the schema [bucket_name, filepath_relative_to_bucket] * @param storageUrlSchemePrefix Eg: s3:// or gs://. The storage-provider-specific prefix to use within the URL. */ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
