nsivabalan commented on code in PR #8290:
URL: https://github.com/apache/hudi/pull/8290#discussion_r1152618564


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/gcs/GcsObjectsFetcher.java:
##########
@@ -21,29 +21,33 @@
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon;
+import org.apache.hudi.utilities.sources.helpers.CloudObject;
+
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 
 import java.io.Serializable;
 import java.util.List;
+
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
+import static 
org.apache.hudi.utilities.sources.helpers.CloudObjectsSelectorCommon.getCloudObjectsPerPartition;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.CLOUD_DATAFILE_EXTENSION;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.IGNORE_RELATIVE_PATH_PREFIX;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.IGNORE_RELATIVE_PATH_SUBSTR;
 import static 
org.apache.hudi.utilities.sources.helpers.CloudStoreIngestionConfig.SELECT_RELATIVE_PATH_PREFIX;
 
 /**
- * Extracts a list of fully qualified GCS filepaths from a given Spark Dataset 
as input.
+ * Extracts a list of GCS {@link CloudObject} containing filepaths from a 
given Spark Dataset as input.
  * Optionally:
  * i) Match the filename and path against provided input filter strings
  * ii) Check if each file exists on GCS, in which case it assumes SparkContext 
is already
  * configured with GCS options through 
GcsEventsHoodieIncrSource.addGcsAccessConfs().
  */
-public class FilePathsFetcher implements Serializable {
+public class GcsObjectsFetcher implements Serializable {

Review Comment:
   GcsObjectsFetcher and GcsObjectsDataFetcher is still confusing a bit. 
   can we name these as 
   GcsObjectsPathFetcher
   GcsObjectsDataFetcher



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +139,41 @@ private static boolean checkIfFileExists(String 
storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()
+        + " and some samples " + 
cloudObjects.stream().map(CloudObject::getPath).limit(10).collect(Collectors.toList()));
+
+    if (isNullOrEmpty(cloudObjects)) {
+      return Option.empty();
+    }
+    DataFrameReader reader = spark.read().format(fileFormat);
+    String datasourceOpts = props.getString(SPARK_DATASOURCE_OPTIONS, null);
+    if (StringUtils.isNullOrEmpty(datasourceOpts)) {
+      // fall back to legacy config for BWC. TODO consolidate in HUDI-5780
+      datasourceOpts = 
props.getString(S3EventsHoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null);
+    }
+    if (StringUtils.nonEmpty(datasourceOpts)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      Map<String, String> sparkOptionsMap = null;
+      try {
+        sparkOptionsMap = mapper.readValue(datasourceOpts, Map.class);
+      } catch (IOException e) {
+        throw new HoodieException(String.format("Failed to parse sparkOptions: 
%s", datasourceOpts), e);
+      }
+      LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
+      reader = reader.options(sparkOptionsMap);
+    }
+    List<String> paths = new ArrayList<>();
+    long totalSize = 0;
+    for (CloudObject o: cloudObjects) {
+      paths.add(o.getPath());
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    totalSize *= 1.1;
+    long parquetMaxFileSize = props.getLong(PARQUET_MAX_FILE_SIZE.key(), 
Long.parseLong(PARQUET_MAX_FILE_SIZE.defaultValue()));
+    int numPartitions = (int) Math.max(totalSize / parquetMaxFileSize, 1);
+    return Option.of(reader.load(paths.toArray(new 
String[cloudObjects.size()])).coalesce(numPartitions));

Review Comment:
   don't we need to do repartition here? 
   coalesce [may not 
increase](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.coalesce.html)
 the partitions.
   for eg, if we have one file from S3 which is 1GB, we want to repartition so 
that we get 10 spark partitions. 
   
   may be we can optimize based on total size and max Parquet size. if its less 
we can do coalesce, if its higher, we can do repartition. 
   
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +139,41 @@ private static boolean checkIfFileExists(String 
storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()

Review Comment:
   do you think we should do Logger.isDebugEnabled() ? 
   since this might trigger the dag, just want to be cautious



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +129,41 @@ private static boolean checkIfFileExists(String 
storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()
+        + " and some samples " + 
cloudObjects.stream().map(CloudObject::getPath).limit(10).collect(Collectors.toList()));
+
+    if (isNullOrEmpty(cloudObjects)) {
+      return Option.empty();
+    }
+    DataFrameReader reader = spark.read().format(fileFormat);
+    String datasourceOpts = props.getString(SPARK_DATASOURCE_OPTIONS, null);
+    if (StringUtils.isNullOrEmpty(datasourceOpts)) {
+      // fall back to legacy config for BWC. TODO consolidate in HUDI-5780
+      datasourceOpts = 
props.getString(S3EventsHoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null);
+    }
+    if (StringUtils.nonEmpty(datasourceOpts)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      Map<String, String> sparkOptionsMap = null;
+      try {
+        sparkOptionsMap = mapper.readValue(datasourceOpts, Map.class);
+      } catch (IOException e) {
+        throw new HoodieException(String.format("Failed to parse sparkOptions: 
%s", datasourceOpts), e);
+      }
+      LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
+      reader = reader.options(sparkOptionsMap);
+    }
+    List<String> paths = new ArrayList<>();
+    long totalSize = 0;
+    for (CloudObject o: cloudObjects) {
+      paths.add(o.getPath());
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    totalSize *= 1.1;

Review Comment:
   I feel we can't we can do 10%. our meta fields overhead is not proportional 
to payload size. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java:
##########
@@ -115,4 +129,41 @@ private static boolean checkIfFileExists(String 
storageUrlSchemePrefix, String b
       throw new HoodieIOException(errMsg, ioe);
     }
   }
+
+  public static Option<Dataset<Row>> loadAsDataset(SparkSession spark, 
List<CloudObject> cloudObjects, TypedProperties props, String fileFormat) {
+    LOG.debug("Extracted distinct files " + cloudObjects.size()
+        + " and some samples " + 
cloudObjects.stream().map(CloudObject::getPath).limit(10).collect(Collectors.toList()));
+
+    if (isNullOrEmpty(cloudObjects)) {
+      return Option.empty();
+    }
+    DataFrameReader reader = spark.read().format(fileFormat);
+    String datasourceOpts = props.getString(SPARK_DATASOURCE_OPTIONS, null);
+    if (StringUtils.isNullOrEmpty(datasourceOpts)) {
+      // fall back to legacy config for BWC. TODO consolidate in HUDI-5780
+      datasourceOpts = 
props.getString(S3EventsHoodieIncrSource.Config.SPARK_DATASOURCE_OPTIONS, null);
+    }
+    if (StringUtils.nonEmpty(datasourceOpts)) {
+      final ObjectMapper mapper = new ObjectMapper();
+      Map<String, String> sparkOptionsMap = null;
+      try {
+        sparkOptionsMap = mapper.readValue(datasourceOpts, Map.class);
+      } catch (IOException e) {
+        throw new HoodieException(String.format("Failed to parse sparkOptions: 
%s", datasourceOpts), e);
+      }
+      LOG.info(String.format("sparkOptions loaded: %s", sparkOptionsMap));
+      reader = reader.options(sparkOptionsMap);
+    }
+    List<String> paths = new ArrayList<>();
+    long totalSize = 0;
+    for (CloudObject o: cloudObjects) {
+      paths.add(o.getPath());
+      totalSize += o.getSize();
+    }
+    // inflate 10% for potential hoodie meta fields
+    totalSize *= 1.1;

Review Comment:
   we can get avg record size from latest commit metadata right? is that doable 
? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to