This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 350b0ec [HUDI-311] : Support for AWS Database Migration Service in
DeltaStreamer
350b0ec is described below
commit 350b0ecb4d137411c6231a1568add585c6d7b7d5
Author: vinoth chandar <[email protected]>
AuthorDate: Sun Dec 22 23:33:35 2019 -0800
[HUDI-311] : Support for AWS Database Migration Service in DeltaStreamer
- Add a transformer class, that adds `Op` fiels if not found in input frame
- Add a payload implementation, that issues deletes when Op=D
- Remove Parquet as a top level source type, consolidate with RowSource
- Made delta streamer work without a property file, simply using
overridden cli options
- Unit tests for transformer/payload classes
---
.../common/util/DFSPropertiesConfiguration.java | 15 ++-
.../org/apache/hudi/payload/AWSDmsAvroPayload.java | 68 +++++++++++++
.../org/apache/hudi/utilities/UtilHelpers.java | 18 +++-
.../deltastreamer/SourceFormatAdapter.java | 15 ---
.../utilities/schema/RowBasedSchemaProvider.java | 6 ++
.../hudi/utilities/sources/ParquetDFSSource.java | 20 ++--
.../org/apache/hudi/utilities/sources/Source.java | 2 +-
.../AWSDmsTransformer.java} | 32 ++++--
.../TestAWSDatabaseMigrationServiceSource.java | 107 +++++++++++++++++++++
.../apache/hudi/utilities/UtilitiesTestBase.java | 1 -
.../hudi/utilities/sources/TestDFSSource.java | 2 +-
11 files changed, 239 insertions(+), 47 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
index 838d4b8..f535cac 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/DFSPropertiesConfiguration.java
@@ -60,6 +60,17 @@ public class DFSPropertiesConfiguration {
visitFile(rootFile);
}
+ public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
+ this(fs, rootFile, new TypedProperties());
+ }
+
+ public DFSPropertiesConfiguration() {
+ this.fs = null;
+ this.rootFile = null;
+ this.props = new TypedProperties();
+ this.visitedFiles = new HashSet<>();
+ }
+
private String[] splitProperty(String line) {
int ind = line.indexOf('=');
String k = line.substring(0, ind).trim();
@@ -106,10 +117,6 @@ public class DFSPropertiesConfiguration {
}
}
- public DFSPropertiesConfiguration(FileSystem fs, Path rootFile) {
- this(fs, rootFile, new TypedProperties());
- }
-
public TypedProperties getConfig() {
return props;
}
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
new file mode 100644
index 0000000..09898ec
--- /dev/null
+++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.payload;
+
+import org.apache.hudi.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import java.io.IOException;
+
+/**
+ * Provides support for seamlessly applying changes captured via Amazon
Database Migration Service onto S3.
+ *
+ * Typically, we get the following pattern of full change records
corresponding to DML against the
+ * source database
+ *
+ * - Full load records with no `Op` field
+ * - For inserts against the source table, records contain full after image
with `Op=I`
+ * - For updates against the source table, records contain full after image
with `Op=U`
+ * - For deletes against the source table, records contain full before image
with `Op=D`
+ *
+ * This payload implementation will issue matching insert, delete, updates
against the hudi dataset
+ *
+ */
+public class AWSDmsAvroPayload extends OverwriteWithLatestAvroPayload {
+
+ public static final String OP_FIELD = "Op";
+
+ public AWSDmsAvroPayload(GenericRecord record, Comparable orderingVal) {
+ super(record, orderingVal);
+ }
+
+ public AWSDmsAvroPayload(Option<GenericRecord> record) {
+ this(record.get(), (record1) -> 0); // natural order
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema)
+ throws IOException {
+ IndexedRecord insertValue = getInsertValue(schema).get();
+ boolean delete = false;
+ if (insertValue instanceof GenericRecord) {
+ GenericRecord record = (GenericRecord) insertValue;
+ delete = record.get(OP_FIELD) != null &&
record.get(OP_FIELD).toString().equalsIgnoreCase("D");
+ }
+
+ return delete ? Option.empty() : Option.of(insertValue);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 218934a..4cb56e9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -27,7 +27,7 @@ import org.apache.hudi.common.util.TypedProperties;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.Source;
@@ -92,16 +92,24 @@ public class UtilHelpers {
/**
*/
public static DFSPropertiesConfiguration readConfig(FileSystem fs, Path
cfgPath, List<String> overriddenProps) {
+ DFSPropertiesConfiguration conf;
+ try {
+ conf = new
DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
+ } catch (Exception e) {
+ conf = new DFSPropertiesConfiguration();
+ LOG.warn("Unexpected error read props file at :" + cfgPath, e);
+ }
+
try {
- DFSPropertiesConfiguration conf = new
DFSPropertiesConfiguration(cfgPath.getFileSystem(fs.getConf()), cfgPath);
if (!overriddenProps.isEmpty()) {
LOG.info("Adding overridden properties to file properties.");
conf.addProperties(new BufferedReader(new
StringReader(String.join("\n", overriddenProps))));
}
- return conf;
- } catch (Exception e) {
- throw new HoodieException("Unable to read props file at :" + cfgPath, e);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Unexpected error adding config overrides",
ioe);
}
+
+ return conf;
}
public static TypedProperties buildProperties(List<String> props) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index 65779e0..a21d263 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.sources.AvroSource;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.JsonSource;
-import org.apache.hudi.utilities.sources.ParquetSource;
import org.apache.hudi.utilities.sources.RowSource;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.sources.helpers.AvroConvertor;
@@ -60,8 +59,6 @@ public final class SourceFormatAdapter {
switch (source.getSourceType()) {
case AVRO:
return ((AvroSource) source).fetchNext(lastCkptStr, sourceLimit);
- case PARQUET:
- return ((ParquetSource) source).fetchNext(lastCkptStr, sourceLimit);
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)
source).fetchNext(lastCkptStr, sourceLimit);
AvroConvertor convertor = new
AvroConvertor(r.getSchemaProvider().getSourceSchema());
@@ -102,18 +99,6 @@ public final class SourceFormatAdapter {
.orElse(null)),
r.getCheckpointForNextBatch(), r.getSchemaProvider());
}
- case PARQUET: {
- InputBatch<JavaRDD<GenericRecord>> r = ((ParquetSource)
source).fetchNext(lastCkptStr, sourceLimit);
- Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
- return new InputBatch<>(
- Option
- .ofNullable(
- r.getBatch()
- .map(rdd ->
AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
- source.getSparkSession()))
- .orElse(null)),
- r.getCheckpointForNextBatch(), r.getSchemaProvider());
- }
case JSON: {
InputBatch<JavaRDD<String>> r = ((JsonSource)
source).fetchNext(lastCkptStr, sourceLimit);
Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
index 4b708fa..6c8a3d0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
@@ -19,8 +19,10 @@
package org.apache.hudi.utilities.schema;
import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.common.util.TypedProperties;
import org.apache.avro.Schema;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.types.StructType;
public class RowBasedSchemaProvider extends SchemaProvider {
@@ -31,6 +33,10 @@ public class RowBasedSchemaProvider extends SchemaProvider {
private StructType rowStruct;
+ public RowBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
+ super(props, jssc);
+ }
+
public RowBasedSchemaProvider(StructType rowStruct) {
super(null, null);
this.rowStruct = rowStruct;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index 9f4eab1..3a8f722 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -24,17 +24,15 @@ import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.parquet.avro.AvroParquetInputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* DFS Source that reads parquet data.
*/
-public class ParquetDFSSource extends ParquetSource {
+public class ParquetDFSSource extends RowSource {
private final DFSPathSelector pathSelector;
@@ -45,17 +43,15 @@ public class ParquetDFSSource extends ParquetSource {
}
@Override
- protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
+ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
- .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
- .orElseGet(() -> new InputBatch<>(Option.empty(),
selectPathsWithMaxModificationTime.getRight()));
+ .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
+ .orElseGet(() -> Pair.of(Option.empty(),
selectPathsWithMaxModificationTime.getRight()));
}
- private JavaRDD<GenericRecord> fromFiles(String pathStr) {
- JavaPairRDD<Void, GenericRecord> avroRDD =
sparkContext.newAPIHadoopFile(pathStr, AvroParquetInputFormat.class,
- Void.class, GenericRecord.class, sparkContext.hadoopConfiguration());
- return avroRDD.values();
+ private Dataset<Row> fromFiles(String pathStr) {
+ return sparkSession.read().parquet(pathStr.split(","));
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
index 2afe8bb..0760c73 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/Source.java
@@ -36,7 +36,7 @@ public abstract class Source<T> implements Serializable {
private static final Logger LOG = LogManager.getLogger(Source.class);
public enum SourceType {
- JSON, AVRO, ROW, PARQUET
+ JSON, AVRO, ROW
}
protected transient TypedProperties props;
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java
similarity index 50%
rename from
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
rename to
hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java
index 58fe5ad..bffa6e4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/AWSDmsTransformer.java
@@ -16,20 +16,36 @@
* limitations under the License.
*/
-package org.apache.hudi.utilities.sources;
+package org.apache.hudi.utilities.transform;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.TypedProperties;
-import org.apache.hudi.utilities.schema.SchemaProvider;
+import org.apache.hudi.payload.AWSDmsAvroPayload;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-public abstract class ParquetSource extends Source<JavaRDD<GenericRecord>> {
+import java.util.Arrays;
- public ParquetSource(TypedProperties props, JavaSparkContext sparkContext,
SparkSession sparkSession,
- SchemaProvider schemaProvider) {
- super(props, sparkContext, sparkSession, schemaProvider,
SourceType.PARQUET);
+import static org.apache.spark.sql.functions.lit;
+
+/**
+ * A Simple transformer that adds `Op` field with value `I`, for AWS DMS data,
if the field is not
+ * present.
+ */
+public class AWSDmsTransformer implements Transformer {
+
+ @Override
+ public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession,
Dataset<Row> rowDataset,
+ TypedProperties properties) {
+ Option<String> opColumnOpt = Option.fromJavaOptional(
+ Arrays.stream(rowDataset.columns()).filter(c ->
c.equals(AWSDmsAvroPayload.OP_FIELD)).findFirst());
+ if (opColumnOpt.isPresent()) {
+ return rowDataset;
+ } else {
+ return rowDataset.withColumn(AWSDmsAvroPayload.OP_FIELD, lit(""));
+ }
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java
new file mode 100644
index 0000000..d015a42
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestAWSDatabaseMigrationServiceSource.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.payload.AWSDmsAvroPayload;
+import org.apache.hudi.utilities.transform.AWSDmsTransformer;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestAWSDatabaseMigrationServiceSource {
+
+ private static JavaSparkContext jsc;
+ private static SparkSession spark;
+
+ @BeforeClass
+ public static void setupTest() {
+ jsc = UtilHelpers.buildSparkContext("aws-dms-test", "local[2]");
+ spark = SparkSession.builder().config(jsc.getConf()).getOrCreate();
+ }
+
+ @AfterClass
+ public static void tearDownTest() {
+ if (jsc != null) {
+ jsc.stop();
+ }
+ }
+
+ @Test
+ public void testPayload() throws IOException {
+ final Schema schema = Schema.createRecord(Arrays.asList(
+ new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
+ new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
+ new Schema.Field(AWSDmsAvroPayload.OP_FIELD,
Schema.create(Schema.Type.STRING), "", null)
+ ));
+ final GenericRecord record = new GenericData.Record(schema);
+
+ record.put("id", "1");
+ record.put("Op", "");
+ record.put("ts", 0L);
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(record, (Comparable)
record.get("ts"));
+ assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent());
+
+ record.put("Op", "I");
+ payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
+ assertTrue(payload.combineAndGetUpdateValue(null, schema).isPresent());
+
+ record.put("Op", "D");
+ payload = new AWSDmsAvroPayload(record, (Comparable) record.get("ts"));
+ assertFalse(payload.combineAndGetUpdateValue(null, schema).isPresent());
+ }
+
+ static class Record implements Serializable {
+ String id;
+ long ts;
+
+ Record(String id, long ts) {
+ this.id = id;
+ this.ts = ts;
+ }
+ }
+
+ @Test
+ public void testTransformer() {
+ AWSDmsTransformer transformer = new AWSDmsTransformer();
+ Dataset<Row> inputFrame = spark.createDataFrame(Arrays.asList(
+ new Record("1", 3433L),
+ new Record("2", 3433L)), Record.class);
+
+ Dataset<Row> outputFrame = transformer.apply(jsc, spark, inputFrame, null);
+ assertTrue(Arrays.asList(outputFrame.schema().fields()).stream()
+ .map(f -> f.name()).anyMatch(n ->
n.equals(AWSDmsAvroPayload.OP_FIELD)));
+
assertTrue(outputFrame.select(AWSDmsAvroPayload.OP_FIELD).collectAsList().stream()
+ .allMatch(r -> r.getString(0).equals("")));
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 753f947..ed4e037 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -61,7 +61,6 @@ import java.util.List;
/**
* Abstract test that provides a dfs & spark contexts.
*
- * TODO(vc): this needs to be done across the board.
*/
public class UtilitiesTestBase {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
index 369e385..f7ac61f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java
@@ -139,7 +139,7 @@ public class TestDFSSource extends UtilitiesTestBase {
TypedProperties props = new TypedProperties();
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath +
"/parquetFiles");
- ParquetSource parquetDFSSource = new ParquetDFSSource(props, jsc,
sparkSession, schemaProvider);
+ ParquetDFSSource parquetDFSSource = new ParquetDFSSource(props, jsc,
sparkSession, schemaProvider);
SourceFormatAdapter parquetSource = new
SourceFormatAdapter(parquetDFSSource);
// 1. Extract without any checkpoint => get all the data, respecting
sourceLimit