taherk77 commented on a change in pull request #962: [HUDI-251] JDBC
incremental load to HUDI DeltaStreamer
URL: https://github.com/apache/incubator-hudi/pull/962#discussion_r336429778
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
##########
@@ -0,0 +1,358 @@
+package org.apache.hudi.utilities.sources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.IntStream;
+import javax.xml.crypto.Data;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.functions;
+
+public class TestJdbcSource extends UtilitiesTestBase {
+
+ private static Logger LOG = LoggerFactory.getLogger(JDBCSource.class);
+ private TypedProperties props = new TypedProperties();
+
+ @BeforeClass
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass();
+ }
+
+ @AfterClass
+ public static void cleanupClass() throws Exception {
+ UtilitiesTestBase.cleanupClass();
+ }
+
+ @Before
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ cleanDerby();
+ }
+
+ @After
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ cleanDerby();
+ DatabaseUtils.closeAllJdbcResources();
+ }
+
+ private void addSecretFileToProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.jdbc.password.file",
DatabaseUtils.getFullPathToSecret());
+ }
+
+ private void addSecretStringToProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.jdbc.password",
DatabaseUtils.getSecret());
+ }
+
+
+ private void prepareProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.jdbc.url", DatabaseUtils.getUrl());
+ props.setProperty("hoodie.datasource.jdbc.user", DatabaseUtils.getUser());
+ props.setProperty("hoodie.datasource.jdbc.table.name", "trips");
+ props.setProperty("hoodie.datasource.jdbc.driver.class",
"org.apache.derby.jdbc.EmbeddedDriver");
+ props.setProperty("hoodie.datasource.jdbc.jar.path", "");
+ props.setProperty("hoodie.datasource.write.recordkey.field", "row_key");
+ props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
+ props.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+ props.setProperty("hoodie.datasource.jdbc.table.incremental.column.name",
"last_insert");
+ props.setProperty("hoodie.datasource.jdbc.extra.options",
"fetchsize=1000");
+ }
+
+ private void clear(TypedProperties props) {
+ props.clear();
+ }
+
+ private void writeSecretToFs() throws IOException {
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataOutputStream outputStream = fs.create(new
Path(DatabaseUtils.getFullPathToSecret()));
+ outputStream.writeBytes(DatabaseUtils.getSecret());
+ outputStream.close();
+ }
+
+ public void cleanDerby() {
+ try {
+ Files.walk(Paths.get(DatabaseUtils.getDerbyDir()))
+ .sorted(Comparator.reverseOrder())
+ .map(java.nio.file.Path::toFile)
+ .forEach(File::delete);
+ } catch (Exception e) {
+ //exit silently
+ return;
+ }
+ }
+
+ @Test
+ public void testOnlyInserts() {
+ try {
+ int recordsToInsert = 100;
+ String commitTime = "000";
+
+ //Insert records to Derby
+ DatabaseUtils.clearAndInsert(commitTime, recordsToInsert);
+
+ //Validate if we have specified records in db
+ Assert.assertEquals(recordsToInsert, DatabaseUtils.count());
+
+ //Prepare props
+ clear(props);
+ prepareProps(props);
+ addSecretStringToProps(props);
+
+ //Start JDBCSource
+ Source jdbcSource = new JDBCSource(props, jsc, sparkSession, null);
+ InputBatch inputBatch = jdbcSource.fetchNewData(Option.empty(),
Long.MAX_VALUE);
+ Dataset<Row> rowDataset = (Dataset<Row>) inputBatch.getBatch().get();
+
+ //Assert if recordsToInsert are equal to the records in the DF
+ long count = rowDataset.count();
+ LOG.info("Num records in output {}", count);
+ Assert.assertEquals(count, recordsToInsert);
+
+ } catch (IOException | SQLException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInsertAndUpdate() throws IOException, SQLException {
+ final String commitTime = "000";
+ final int recordsToInsert = 100;
+ final int recordsToUpdate = 50;
+
+ List<HoodieRecord> insert = DatabaseUtils.clearAndInsert(commitTime,
recordsToInsert);
+ List<HoodieRecord> update = DatabaseUtils.update(commitTime, insert);
+ }
+
+ @Test
+ public void testCheckpointingWhenPipelineIsIncremental() throws IOException,
SQLException {
+ DatabaseUtils.clearAndInsert("000", 10);
+
+ //Prepare props
+ clear(props);
+ prepareProps(props);
+ addSecretStringToProps(props);
+
+ //Start JDBCSource
+ Source jdbcSource = new JDBCSource(props, jsc, sparkSession, null);
+ InputBatch inputBatch = jdbcSource.fetchNewData(Option.empty(),
Long.MAX_VALUE);
+ Dataset<Row> rowDataset = (Dataset<Row>) inputBatch.getBatch().get();
+ String incrementalVal = rowDataset.agg(functions.max(functions.col(
+
props.getString("hoodie.datasource.jdbc.table.incremental.column.name"))).cast(DataTypes.StringType)).first()
+ .getString(0);
+ System.out.println("Incremental value is " + incrementalVal);
+
+ DatabaseUtils.insert("000", 5);
+ Source jdbcSource1 = new JDBCSource(props, jsc, sparkSession, null);
+ InputBatch inputBatch1 =
jdbcSource1.fetchNewData(Option.of(incrementalVal), Long.MAX_VALUE);
Review comment:
The way we do jdbc incremental pull is that we will run use the checkpoint
value and prepare a ppd query, the query we prepare inside the JDBCSource with
the incremental value is "(select * from table where incremental_column >
'2019-09-09 15:45:01.53' ) rdbms_table" and we except that the incremental
column is either a timestamp or an int/long val.
This worked well when we tried it with MYSQL and mysql was ok converting the
string column into int or long or timestamp based on whichever column we
compared it with. However, derby is not doing that and is throwing exceptions.
if the ppd query is changed to "(select * from table where incremental_column
>Timestamp( '2019-09-09 15:45:01.53') ) rdbms_table" then it works as the
incremental column value has been casted/converted to timestamp, but normal
strings do not work here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services