taherk77 commented on a change in pull request #969: [HUDI-251] JDBC 
incremental load to HUDI DeltaStreamer
URL: https://github.com/apache/incubator-hudi/pull/969#discussion_r338899781
 
 

 ##########
 File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
 ##########
 @@ -0,0 +1,529 @@
+package org.apache.hudi.utilities.sources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.functions;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestJdbcSource extends UtilitiesTestBase {
+
+  private static Logger LOG = LogManager.getLogger(JDBCSource.class);
+  private TypedProperties props = new TypedProperties();
+
+  @BeforeClass
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass();
+    DatabaseUtils.closeAllJdbcResources();
+    cleanDerby();
+  }
+
+  @AfterClass
+  public static void cleanupClass() throws Exception {
+    UtilitiesTestBase.cleanupClass();
+    DatabaseUtils.closeAllJdbcResources();
+    cleanDerby();
+  }
+
+  @Before
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @After
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  private void addSecretStringToProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.jdbc.password", 
DatabaseUtils.getSecret());
+  }
+
+
+  private void prepareProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.jdbc.url", DatabaseUtils.getUrl());
+    props.setProperty("hoodie.datasource.jdbc.user", DatabaseUtils.getUser());
+    props.setProperty("hoodie.datasource.jdbc.table.name", "trips");
+    props.setProperty("hoodie.datasource.jdbc.driver.class", 
"org.apache.derby.jdbc.EmbeddedDriver");
+    props.setProperty("hoodie.datasource.write.recordkey.field", "row_key");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
+    props.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+    props.setProperty("hoodie.datasource.jdbc.table.incremental.column.name", 
"last_insert");
+    props.setProperty("hoodie.datasource.jdbc.extra.options.fetchsize", 
"1000");
+  }
+
+  private void clear(TypedProperties props) {
+    props.clear();
+  }
+
+  private void writeSecretToFs() throws IOException {
+    FileSystem fs = FileSystem.get(new Configuration());
+    FSDataOutputStream outputStream = fs.create(new 
Path(DatabaseUtils.getFullPathToSecret()));
+    outputStream.writeBytes(DatabaseUtils.getSecret());
+    outputStream.close();
+  }
+
+  public static void cleanDerby() {
+    try {
+      Files.walk(Paths.get(DatabaseUtils.getDerbyDir()))
+          .sorted(Comparator.reverseOrder())
+          .map(java.nio.file.Path::toFile)
+          .forEach(File::delete);
+    } catch (Exception e) {
+      //exit silently
+      return;
+    } finally {
+      //Needed to avoid connection issues and shut derby
+      System.gc();
+    }
+  }
+
+  @Test
+  public void testSingleCommit() {
+    try {
+      int recordsToInsert = 100;
+      String commitTime = "000";
+
+      //Insert records to Derby
+      DatabaseUtils.clearAndInsert(commitTime, recordsToInsert);
+
+      //Validate if we have specified records in db
+      Assert.assertEquals(recordsToInsert, DatabaseUtils.count());
+
+      //Prepare props
+      getProps();
+
+      //Start JDBCSource
+      runSourceAndAssert(Option.empty(), recordsToInsert);
+
+    } catch (IOException | SQLException e) {
+      handle(e);
+    }
+  }
+
+  private void handle(Exception e) {
+    e.printStackTrace();
+    Assert.fail(e.getMessage());
+  }
+
+  @Test
+  public void testInsertAndUpdate() {
+    try {
+      getProps();
+      final String commitTime = "000";
+      final int recordsToInsert = 100;
+
+      //Add 100 records
+      //Update half of them with commit time "007"
+      //Check if updates are through or not
+      DatabaseUtils.update("007",
+          DatabaseUtils.clearAndInsert(commitTime, recordsToInsert)
+              .stream()
+              .limit(50)
+              .collect(Collectors.toList())
+      );
+      //Check if database has 100 records
+      Assert.assertEquals("Database has 100 records", 100, 
DatabaseUtils.count());
+
+      //Start JDBCSource
+      Dataset<Row> rowDataset = runSourceAndAssert(Option.empty(), 100);
+
+      Dataset<Row> firstCommit = rowDataset.where("commit_time=000");
+      Assert.assertEquals("Should have 50 records with commit time 000", 50, 
firstCommit.count());
+
+      Dataset<Row> secondCommit = rowDataset.where("commit_time=007");
+      Assert.assertEquals("Should have 50 records with commit time 000", 50, 
secondCommit.count());
+
+    } catch (Exception e) {
+      handle(e);
+    }
+  }
+
+  @Test
+  public void testSourceWithPasswordOnFs() {
+    try {
+      //Write secret string to fs in a file
+      writeSecretToFs();
+      //Prepare props
+      getProps();
+      //Remove secret string from props
+      props.remove("hoodie.datasource.jdbc.password");
+      //Set property to read secret from fs file
+      props.setProperty("hoodie.datasource.jdbc.password.file", 
DatabaseUtils.getFullPathToSecret());
+      //Add 10 records with commit time 000
+      DatabaseUtils.clearAndInsert("000", 10);
+      runSourceAndAssert(Option.empty(), 10);
+    } catch (Exception e) {
+      handle(e);
+    }
+  }
+
+
+  @Test(expected = NoSuchElementException.class)
+  //Internally JDBCSource should throw an IllegalArgumentException as the 
password is not give.
+  //However it is internally handled. Even so we assert NoSuchElementException 
which is thrown
+  //from the Options class there is no Dataset computed.
+  public void testSourceWithNoPassword() {
+    try {
+      //Write secret string to fs in a file
+      writeSecretToFs();
+      //Prepare props
+      getProps();
+      //Remove secret string from props
+      props.remove("hoodie.datasource.jdbc.password");
+
+      //Add 10 records with commit time 000
+      DatabaseUtils.clearAndInsert("000", 10);
+      runSourceAndAssert(Option.empty(), 10);
+    } catch (IOException | SQLException e) {
+      handle(e);
+    }
+  }
+
+
+  @Test
+  public void testTwoCommits() {
+    try {
+      //Prepare props
+      getProps();
+
+      //Add 10 records with commit time "000"
+      DatabaseUtils.clearAndInsert("000", 10);
+
+      //Start JDBCSource
+      Dataset<Row> rowDataset = runSourceAndAssert(Option.empty(), 10);
+      Assert.assertEquals(10, rowDataset.where("commit_time=000").count());
+
+      //Add 10 records with commit time 001
+      DatabaseUtils.insert("001", 5);
+      Assert.assertEquals(5, rowDataset.where("commit_time=001").count());
+
+      //Start second commit and check if all records are pulled
+      runSourceAndAssert(Option.empty(), 15);
+
+
+    } catch (Exception e) {
+      handle(e);
+    }
+
+  }
+
+  private Dataset<Row> runSourceAndAssert(Option<String> option, int expected) 
{
+    Source jdbcSource = new JDBCSource(props, jsc, sparkSession, null);
+    InputBatch inputBatch = jdbcSource.fetchNewData(option, Long.MAX_VALUE);
+    Dataset<Row> rowDataset = (Dataset<Row>) inputBatch.getBatch().get();
+    Assert.assertEquals(expected, rowDataset.count());
+    return rowDataset;
+  }
+
+
+  @Test
+  public void testIncrementalScanWithTimestamp() {
+    try {
+      getProps();
+
+      //Add 10 records with commit time "000"
+      DatabaseUtils.clearAndInsert("000", 10);
+
+      //Start JDBCSource
+      Dataset<Row> rowDataset = runSourceAndAssert(Option.empty(), 10);
+
+      //get max of incremental column
+      Column incrementalColumn = rowDataset
+          
.col(props.getString("hoodie.datasource.jdbc.table.incremental.column.name"));
+      final String max = 
rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
+          .getString(0);
+      LOG.info(String.format("Incremental max value: %s", max));
+
+      //Add 10 records with commit time "001"
+      DatabaseUtils.insert("001", 10);
+
+      //Start Incremental scan
+      //Because derby incremental scan PPQ query fails due to the timestamp 
issues the incremental scan should
+      //throw an exception internally and fallback to a full scan. Hence 
validate if the all the rows of the
+      //table are selected or not
+      Dataset<Row> rowDataset1 = runSourceAndAssert(Option.of(max), 20);
+      Assert.assertEquals(10, rowDataset.where("commit_time=000").count());
+      Assert.assertEquals(10, rowDataset.where("commit_time=001").count());
 
 Review comment:
   will correct that

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to