taherk77 commented on a change in pull request #962: [HUDI-251] JDBC 
incremental load to HUDI DeltaStreamer
URL: https://github.com/apache/incubator-hudi/pull/962#discussion_r336429778
 
 

 ##########
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
 ##########
 @@ -0,0 +1,358 @@
+package org.apache.hudi.utilities.sources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.IntStream;
+import javax.xml.crypto.Data;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.functions;
+
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static Logger LOG = LoggerFactory.getLogger(JDBCSource.class);
+  private TypedProperties props = new TypedProperties();
+
+  @BeforeClass
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass();
+  }
+
+  @AfterClass
+  public static void cleanupClass() throws Exception {
+    UtilitiesTestBase.cleanupClass();
+  }
+
+  @Before
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    cleanDerby();
+  }
+
+  @After
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    cleanDerby();
+    DatabaseUtils.closeAllJdbcResources();
+  }
+
+  private void addSecretFileToProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.jdbc.password.file", 
DatabaseUtils.getFullPathToSecret());
+  }
+
+  private void addSecretStringToProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.jdbc.password", 
DatabaseUtils.getSecret());
+  }
+
+
+  private void prepareProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.jdbc.url", DatabaseUtils.getUrl());
+    props.setProperty("hoodie.datasource.jdbc.user", DatabaseUtils.getUser());
+    props.setProperty("hoodie.datasource.jdbc.table.name", "trips");
+    props.setProperty("hoodie.datasource.jdbc.driver.class", 
"org.apache.derby.jdbc.EmbeddedDriver");
+    props.setProperty("hoodie.datasource.jdbc.jar.path", "");
+    props.setProperty("hoodie.datasource.write.recordkey.field", "row_key");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
+    props.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    props.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", 
"last_insert");
+    props.setProperty("hoodie.datasource.jdbc.extra.options", 
"fetchsize=1000");
+  }
+
+  private void clear(TypedProperties props) {
+    props.clear();
+  }
+
+  private void writeSecretToFs() throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    FSDataOutputStream outputStream = fs.create(new 
Path(DatabaseUtils.getFullPathToSecret()));
+    outputStream.writeBytes(DatabaseUtils.getSecret());
+    outputStream.close();
+  }
+
+  public void cleanDerby() {
+    try {
+      Files.walk(Paths.get(DatabaseUtils.getDerbyDir()))
+          .sorted(Comparator.reverseOrder())
+          .map(java.nio.file.Path::toFile)
+          .forEach(File::delete);
+    } catch (Exception e) {
+      //exit silently
+      return;
+    }
+  }
+
+  @Test
+  public void testOnlyInserts() {
+    try {
+      int recordsToInsert = 100;
+      String commitTime = "000";
+
+      //Insert records to Derby
+      DatabaseUtils.clearAndInsert(commitTime, recordsToInsert);
+
+      //Validate if we have specified records in db
+      Assert.assertEquals(recordsToInsert, DatabaseUtils.count());
+
+      //Prepare props
+      clear(props);
+      prepareProps(props);
+      addSecretStringToProps(props);
+
+      //Start JDBCSource
+      Source jdbcSource = new JDBCSource(props, jsc, sparkSession, null);
+      InputBatch inputBatch = jdbcSource.fetchNewData(Option.empty(), 
Long.MAX_VALUE);
+      Dataset<Row> rowDataset = (Dataset<Row>) inputBatch.getBatch().get();
+
+      //Assert if recordsToInsert are equal to the records in the DF
+      long count = rowDataset.count();
+      LOG.info("Num records in output {}", count);
+      Assert.assertEquals(count, recordsToInsert);
+
+    } catch (IOException | SQLException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testInsertAndUpdate() throws IOException, SQLException {
+    final String commitTime = "000";
+    final int recordsToInsert = 100;
+    final int recordsToUpdate = 50;
+
+    List<HoodieRecord> insert = DatabaseUtils.clearAndInsert(commitTime, 
recordsToInsert);
+    List<HoodieRecord> update = DatabaseUtils.update(commitTime, insert);
+  }
+
+  @Test
+  public void testCheckpointingWhenPipelineIsIncremental() throws IOException, 
SQLException {
+    DatabaseUtils.clearAndInsert("000", 10);
+
+    //Prepare props
+    clear(props);
+    prepareProps(props);
+    addSecretStringToProps(props);
+
+    //Start JDBCSource
+    Source jdbcSource = new JDBCSource(props, jsc, sparkSession, null);
+    InputBatch inputBatch = jdbcSource.fetchNewData(Option.empty(), 
Long.MAX_VALUE);
+    Dataset<Row> rowDataset = (Dataset<Row>) inputBatch.getBatch().get();
+    String incrementalVal = rowDataset.agg(functions.max(functions.col(
+        
props.getString("hoodie.datasource.jdbc.table.incremental.column.name"))).cast(DataTypes.StringType)).first()
+        .getString(0);
+    System.out.println("Incremental value is " + incrementalVal);
+
+    DatabaseUtils.insert("000", 5);
+    Source jdbcSource1 = new JDBCSource(props, jsc, sparkSession, null);
+    InputBatch inputBatch1 = 
jdbcSource1.fetchNewData(Option.of(incrementalVal), Long.MAX_VALUE);
 
 Review comment:
   The way we do jdbc incremental pull is that we will run use the checkpoint 
value and prepare a ppd query, the query we prepare inside the JDBCSource with 
the incremental value is "(select * from table where  incremental_column > 
'2019-09-09 15:45:01.53' ) rdbms_table" and we except that the incremental 
column is either a timestamp or an int/long val. 
   
   This worked well when we tried it with MYSQL and mysql was ok converting the 
string column into int or long or timestamp based on whichever column we 
compared it with. However, derby is not doing that and is throwing exceptions. 
if the ppd query is changed to "(select * from table where  incremental_column 
>Timestamp( '2019-09-09 15:45:01.53') ) rdbms_table" then it works as the 
incremental column value has been casted/converted to timestamp, but normal 
strings do not work here.  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to