codope commented on code in PR #9336:
URL: https://github.com/apache/hudi/pull/9336#discussion_r1284605124
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -267,6 +272,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsBefore(String instantTime);
+ /**
+ * Finds the instant before specified time.
+ */
+ Option<HoodieInstant> findInstantBefore(String instantTime);
Review Comment:
how about reusing existing `findInstantsBefore`?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -241,6 +241,11 @@ public interface HoodieTimeline extends Serializable {
*/
HoodieTimeline findInstantsInRange(String startTs, String endTs);
+ /**
+ * Create a new Timeline with instants after or equals startTs and before or
on endTs.
+ */
+ HoodieTimeline findInstantsInClosedRange(String startTs, String endTs);
Review Comment:
Remove if not used.
Also, please add tests in TestHoodieActiveTimeline if adding new method to
the timeline.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+public class QueryInfo {
Review Comment:
some javadoc for this class?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java:
##########
@@ -203,6 +204,12 @@ public HoodieDefaultTimeline findInstantsInRange(String
startTs, String endTs) {
getInstantsAsStream().filter(s ->
HoodieTimeline.isInRange(s.getTimestamp(), startTs, endTs)), details);
}
+ @Override
+ public HoodieDefaultTimeline findInstantsInClosedRange(String startTs,
String endTs) {
Review Comment:
let's remove this if not being used.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+
+/**
+ * This class is currently used only by s3 and gcs incr sources that supports
size based batching
+ * This class will fetch comitted files from the current commit to support
size based batching.
+ */
+public class QueryRunner {
+ private final SparkSession sparkSession;
+ private final TypedProperties props;
Review Comment:
props not being used .. let's remove?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryRunner.java:
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.DataSourceUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.exception.HoodieException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.functions;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.hudi.utilities.sources.HoodieIncrSource.Config.HOODIE_SRC_BASE_PATH;
+
+/**
+ * This class is currently used only by s3 and gcs incr sources that supports
size based batching
+ * This class will fetch comitted files from the current commit to support
size based batching.
+ */
+public class QueryRunner {
+ private final SparkSession sparkSession;
+ private final TypedProperties props;
+ private final String sourcePath;
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryRunner.class);
+
+ public QueryRunner(SparkSession sparkSession, TypedProperties props) {
+ this.sparkSession = sparkSession;
+ this.props = props;
+ DataSourceUtils.checkRequiredProperties(props,
Collections.singletonList(HOODIE_SRC_BASE_PATH));
Review Comment:
let's use `HoodieIncrSourceConfig.HOODIE_SRC_BASE_PATH.key()` instead of
deprecated constant?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java:
##########
@@ -413,6 +423,14 @@ static boolean isInRange(String timestamp, String startTs,
String endTs) {
&& HoodieTimeline.compareTimestamps(timestamp,
LESSER_THAN_OR_EQUALS, endTs);
}
+ /**
+ * Return true if specified timestamp is in range [startTs, endTs].
+ */
+ static boolean isInClosedRange(String timestamp, String startTs, String
endTs) {
Review Comment:
won't be needed if `findInstantsInClosedRange` if not being used.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -40,10 +47,15 @@
import static
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.sum;
public class IncrSourceHelper {
- private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrSourceHelper.class);
+ public static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+ private static final String CUMULATIVE_COLUMN_NAME = "cumulativeSize";
+
Review Comment:
nit: remove one extra line.
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/QueryInfo.java:
##########
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL;
+import static
org.apache.hudi.DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL;
+
+public class QueryInfo {
+ private final String queryType;
+ private final String previousInstant;
+ private final String startInstant;
+ private final String endInstant;
+ private final String orderColumn;
+ private final String keyColumn;
+ private final String limitColumn;
+ private final List<String> orderByColumns;
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryInfo.class);
Review Comment:
let;s remove the logger
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -40,10 +47,15 @@
import static
org.apache.hudi.common.table.timeline.TimelineUtils.handleHollowCommitIfNeeded;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.MISSING_CHECKPOINT_STRATEGY;
import static
org.apache.hudi.utilities.config.HoodieIncrSourceConfig.READ_LATEST_INSTANT_ON_MISSING_CKPT;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.sum;
public class IncrSourceHelper {
- private static final String DEFAULT_BEGIN_TIMESTAMP = "000";
+ private static final Logger LOG =
LoggerFactory.getLogger(IncrSourceHelper.class);
+ public static final String DEFAULT_BEGIN_TIMESTAMP = "000";
Review Comment:
Any reason for keeping it to 3 digits only? What if instant parsing fails?
Can we reuse `INIT_INSTANT_TS` in HoodieTimeline?
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/IncrSourceHelper.java:
##########
@@ -106,17 +129,98 @@ public static Pair<String, Pair<String, String>>
calculateBeginAndEndInstants(Ja
}
});
+ String previousInstantTime = beginInstantTime;
+ if (!beginInstantTime.equals(DEFAULT_BEGIN_TIMESTAMP)) {
+ Option<HoodieInstant> previousInstant =
activeCommitTimeline.findInstantBefore(beginInstantTime);
+ if (previousInstant.isPresent()) {
+ previousInstantTime = previousInstant.get().getTimestamp();
+ }
+ }
+
if (missingCheckpointStrategy == MissingCheckpointStrategy.READ_LATEST ||
!activeCommitTimeline.isBeforeTimelineStarts(beginInstantTime)) {
- Option<HoodieInstant> nthInstant =
Option.fromJavaOptional(activeCommitTimeline
- .findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
- return Pair.of(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
Pair.of(beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime)));
+ Option<HoodieInstant> nthInstant;
+ // When we are in the upgrade code path from non-sourcelimit-based
batching to sourcelimit-based batching, we need to avoid fetching the commit
+ // that is read already. Else we will have duplicates in append-only use
case if we use "findInstantsAfterOrEquals".
+ // As soon as we have a new format of checkpoint and a key we will move
to the new code of fetching the current commit as well.
+ if (sourceLimitBasedBatching && lastCheckpointKey.isPresent()) {
+ nthInstant = Option.fromJavaOptional(activeCommitTimeline
+ .findInstantsAfterOrEquals(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+ } else {
+ nthInstant = Option.fromJavaOptional(activeCommitTimeline
+ .findInstantsAfter(beginInstantTime,
numInstantsPerFetch).getInstantsAsStream().reduce((x, y) -> y));
+ }
+ return new
QueryInfo(DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL(),
previousInstantTime,
+ beginInstantTime,
nthInstant.map(HoodieInstant::getTimestamp).orElse(beginInstantTime),
+ orderColumn, keyColumn, limitColumn);
} else {
// when MissingCheckpointStrategy is set to read everything until
latest, trigger snapshot query.
Option<HoodieInstant> lastInstant = activeCommitTimeline.lastInstant();
- return Pair.of(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
Pair.of(beginInstantTime, timestampForLastInstant.apply(lastInstant.get())));
+ return new QueryInfo(DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL(),
+ previousInstantTime, beginInstantTime,
lastInstant.get().getTimestamp(),
+ orderColumn, keyColumn, limitColumn);
}
}
+ /**
+ * Adjust the source dataset to size based batch based on last checkpoint
key.
+ *
+ * @param sourceData Source dataset
+ * @param sourceLimit Max number of bytes to be read from source
+ * @param queryInfo Query Info
+ * @return end instants along with filtered rows.
+ */
+ public static Pair<CloudObjectIncrCheckpoint, Dataset<Row>>
filterAndGenerateCheckpointBasedOnSourceLimit(Dataset<Row> sourceData,
+
long sourceLimit, QueryInfo queryInfo,
+
CloudObjectIncrCheckpoint
cloudObjectIncrCheckpoint) {
+ if (sourceData.isEmpty()) {
+ LOG.info("Empty source, returning endpoint:" +
queryInfo.getEndInstant());
+ return Pair.of(cloudObjectIncrCheckpoint, sourceData);
+ }
+ // Let's persist the dataset to avoid triggering the dag repeatedly
+ sourceData.persist(StorageLevel.MEMORY_AND_DISK());
+ // Set ordering in query to enable batching
+ Dataset<Row> orderedDf = QueryRunner.applyOrdering(sourceData,
queryInfo.getOrderByColumns());
+ Option<String> lastCheckpoint =
Option.of(cloudObjectIncrCheckpoint.getCommit());
+ Option<String> lastCheckpointKey =
Option.ofNullable(cloudObjectIncrCheckpoint.getKey());
+ Option<String> concatenatedKey = lastCheckpoint.flatMap(checkpoint ->
lastCheckpointKey.map(key -> checkpoint + key));
+
+ // Filter until last checkpoint key
+ if (concatenatedKey.isPresent()) {
+ orderedDf = orderedDf.withColumn("commit_key",
+ functions.concat(functions.col(queryInfo.getOrderColumn()),
functions.col(queryInfo.getKeyColumn())));
+ // Apply incremental filter
+ orderedDf =
orderedDf.filter(functions.col("commit_key").gt(concatenatedKey.get())).drop("commit_key");
+ // We could be just at the end of the commit, so return empty
+ if (orderedDf.isEmpty()) {
+ LOG.info("Empty ordered source, returning endpoint:" +
queryInfo.getEndInstant());
+ sourceData.unpersist();
+ return Pair.of(new
CloudObjectIncrCheckpoint(queryInfo.getEndInstant(), lastCheckpointKey.get()),
orderedDf);
+ }
+ }
+
+ // Limit based on sourceLimit
+ WindowSpec windowSpec = Window.orderBy(col(queryInfo.getOrderColumn()),
col(queryInfo.getKeyColumn()));
+ // Add the 'cumulativeSize' column with running sum of 'limitColumn'
+ Dataset<Row> aggregatedData = orderedDf.withColumn(CUMULATIVE_COLUMN_NAME,
+ sum(col(queryInfo.getLimitColumn())).over(windowSpec));
+ Dataset<Row> collectedRows =
aggregatedData.filter(col(CUMULATIVE_COLUMN_NAME).leq(sourceLimit));
+
+ Row row = null;
+ if (collectedRows.isEmpty()) {
+ // If the first element itself exceeds limits then return first element
+ LOG.info("First object exceeding source limit: " + sourceLimit + "
bytes");
+ row = aggregatedData.select(queryInfo.getOrderColumn(),
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).first();
+ collectedRows = aggregatedData.limit(1);
+ } else {
+ // Get the last row and form composite key
+ row = collectedRows.select(queryInfo.getOrderColumn(),
queryInfo.getKeyColumn(), CUMULATIVE_COLUMN_NAME).orderBy(
+ col(queryInfo.getOrderColumn()).desc(),
col(queryInfo.getKeyColumn()).desc()).first();
+ }
+ LOG.info("Processed batch size: " + row.getLong(2) + " bytes");
+ sourceData.unpersist();
+ return Pair.of(new CloudObjectIncrCheckpoint(row.getString(0),
row.getString(1)), collectedRows);
+ }
+
/**
* Validate instant time seen in the incoming row.
Review Comment:
the method `validateInstantTime` is not being used. let's remove.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java:
##########
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.collection.Triple;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.utilities.sources.helpers.CloudDataFetcher;
+import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
+import org.apache.hudi.utilities.sources.helpers.QueryRunner;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
+import static
org.apache.hudi.utilities.sources.helpers.IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarness {
+ private static final Schema S3_METADATA_SCHEMA =
SchemaTestUtil.getSchemaFromResource(
+ TestS3EventsHoodieIncrSource.class, "/streamer-config/s3-metadata.avsc",
true);
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private static final String MY_BUCKET = "some-bucket";
+
+ @Mock
+ private SchemaProvider mockSchemaProvider;
+ @Mock
+ QueryRunner mockQueryRunner;
+ @Mock
+ CloudDataFetcher mockCloudDataFetcher;
+ private JavaSparkContext jsc;
+ private HoodieTableMetaClient metaClient;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
+ metaClient = getHoodieMetaClient(hadoopConf(), basePath());
+ }
+
+ private List<String> getSampleS3ObjectKeys(List<Triple<String, Long,
String>> filePathSizeAndCommitTime) {
+ return filePathSizeAndCommitTime.stream().map(f -> {
+ try {
+ return generateS3EventMetadata(f.getMiddle(), MY_BUCKET, f.getLeft(),
f.getRight());
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Dataset<Row> generateDataset(List<Triple<String, Long, String>>
filePathSizeAndCommitTime) {
+ JavaRDD<String> testRdd =
jsc.parallelize(getSampleS3ObjectKeys(filePathSizeAndCommitTime), 2);
+ Dataset<Row> inputDs = spark().read().json(testRdd);
+ return inputDs;
+ }
+
+ /**
+ * Generates simple Json structure like below
+ * <p>
+ * s3 : {
+ * object : {
+ * size:
+ * key:
+ * }
+ * bucket: {
+ * name:
+ * }
+ */
+ private String generateS3EventMetadata(Long objectSize, String bucketName,
String objectKey, String commitTime)
+ throws JsonProcessingException {
+ Map<String, Object> objectMetadata = new HashMap<>();
+ objectMetadata.put("size", objectSize);
+ objectMetadata.put("key", objectKey);
+ Map<String, String> bucketMetadata = new HashMap<>();
+ bucketMetadata.put("name", bucketName);
+ Map<String, Object> s3Metadata = new HashMap<>();
+ s3Metadata.put("object", objectMetadata);
+ s3Metadata.put("bucket", bucketMetadata);
+ Map<String, Object> eventMetadata = new HashMap<>();
+ eventMetadata.put("s3", s3Metadata);
+ eventMetadata.put("_hoodie_commit_time", commitTime);
+ return mapper.writeValueAsString(eventMetadata);
+ }
+
+ private HoodieRecord generateS3EventMetadata(String commitTime, String
bucketName, String objectKey, Long objectSize) {
+ String partitionPath = bucketName;
+ Schema schema = S3_METADATA_SCHEMA;
+ GenericRecord rec = new GenericData.Record(schema);
+ Schema.Field s3Field = schema.getField("s3");
+ Schema s3Schema = s3Field.schema().getTypes().get(1); // Assuming the
record schema is the second type
+ // Create a generic record for the "s3" field
+ GenericRecord s3Record = new GenericData.Record(s3Schema);
+
+ Schema.Field s3BucketField = s3Schema.getField("bucket");
+ Schema s3Bucket = s3BucketField.schema().getTypes().get(1); // Assuming
the record schema is the second type
+ GenericRecord s3BucketRec = new GenericData.Record(s3Bucket);
+ s3BucketRec.put("name", bucketName);
+
+
+ Schema.Field s3ObjectField = s3Schema.getField("object");
+ Schema s3Object = s3ObjectField.schema().getTypes().get(1); // Assuming
the record schema is the second type
+ GenericRecord s3ObjectRec = new GenericData.Record(s3Object);
+ s3ObjectRec.put("key", objectKey);
+ s3ObjectRec.put("size", objectSize);
+
+ s3Record.put("bucket", s3BucketRec);
+ s3Record.put("object", s3ObjectRec);
+ rec.put("s3", s3Record);
+ rec.put("_hoodie_commit_time", commitTime);
+
+ HoodieAvroPayload payload = new HoodieAvroPayload(Option.of(rec));
+ return new HoodieAvroRecord(new HoodieKey(objectKey, partitionPath),
payload);
+ }
+
+ private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy) {
+ Properties properties = new Properties();
+ properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
+
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+ missingCheckpointStrategy.name());
+
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
Review Comment:
should we parameterize this and test other file formats? if so, we can take
it up even in a followup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]