taherk77 commented on a change in pull request #962: [HUDI-251] JDBC
incremental load to HUDI DeltaStreamer
URL: https://github.com/apache/incubator-hudi/pull/962#discussion_r336562719
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
##########
@@ -0,0 +1,358 @@
+package org.apache.hudi.utilities.sources;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.IntStream;
+import javax.xml.crypto.Data;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.TypedProperties;
+import org.apache.hudi.utilities.UtilitiesTestBase;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.DataTypes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.spark.sql.functions;
+
+public class TestJdbcSource extends UtilitiesTestBase {
+
+ private static Logger LOG = LoggerFactory.getLogger(JDBCSource.class);
+ private TypedProperties props = new TypedProperties();
+
+ @BeforeClass
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass();
+ }
+
+ @AfterClass
+ public static void cleanupClass() throws Exception {
+ UtilitiesTestBase.cleanupClass();
+ }
+
+ @Before
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ cleanDerby();
+ }
+
+ @After
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ cleanDerby();
+ DatabaseUtils.closeAllJdbcResources();
+ }
+
+ private void addSecretFileToProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.jdbc.password.file",
DatabaseUtils.getFullPathToSecret());
+ }
+
+ private void addSecretStringToProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.jdbc.password",
DatabaseUtils.getSecret());
+ }
+
+
+ private void prepareProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.jdbc.url", DatabaseUtils.getUrl());
+ props.setProperty("hoodie.datasource.jdbc.user", DatabaseUtils.getUser());
+ props.setProperty("hoodie.datasource.jdbc.table.name", "trips");
+ props.setProperty("hoodie.datasource.jdbc.driver.class",
"org.apache.derby.jdbc.EmbeddedDriver");
+ props.setProperty("hoodie.datasource.jdbc.jar.path", "");
+ props.setProperty("hoodie.datasource.write.recordkey.field", "row_key");
+ props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
+ props.setProperty("hoodie.datasource.jdbc.incremental.pull", "true");
+ props.setProperty("hoodie.datasource.jdbc.table.incremental.column.name",
"last_insert");
+ props.setProperty("hoodie.datasource.jdbc.extra.options",
"fetchsize=1000");
+ }
+
+ private void clear(TypedProperties props) {
+ props.clear();
+ }
+
+ private void writeSecretToFs() throws IOException {
+ FileSystem fs = FileSystem.get(new Configuration());
+ FSDataOutputStream outputStream = fs.create(new
Path(DatabaseUtils.getFullPathToSecret()));
+ outputStream.writeBytes(DatabaseUtils.getSecret());
+ outputStream.close();
+ }
+
+ public void cleanDerby() {
+ try {
+ Files.walk(Paths.get(DatabaseUtils.getDerbyDir()))
+ .sorted(Comparator.reverseOrder())
+ .map(java.nio.file.Path::toFile)
+ .forEach(File::delete);
+ } catch (Exception e) {
+ //exit silently
+ return;
+ }
+ }
+
+ @Test
+ public void testOnlyInserts() {
+ try {
+ int recordsToInsert = 100;
+ String commitTime = "000";
+
+ //Insert records to Derby
+ DatabaseUtils.clearAndInsert(commitTime, recordsToInsert);
+
+ //Validate if we have specified records in db
+ Assert.assertEquals(recordsToInsert, DatabaseUtils.count());
+
+ //Prepare props
+ clear(props);
+ prepareProps(props);
+ addSecretStringToProps(props);
+
+ //Start JDBCSource
+ Source jdbcSource = new JDBCSource(props, jsc, sparkSession, null);
+ InputBatch inputBatch = jdbcSource.fetchNewData(Option.empty(),
Long.MAX_VALUE);
+ Dataset<Row> rowDataset = (Dataset<Row>) inputBatch.getBatch().get();
+
+ //Assert if recordsToInsert are equal to the records in the DF
+ long count = rowDataset.count();
+ LOG.info("Num records in output {}", count);
+ Assert.assertEquals(count, recordsToInsert);
+
+ } catch (IOException | SQLException e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testInsertAndUpdate() throws IOException, SQLException {
+ final String commitTime = "000";
+ final int recordsToInsert = 100;
+ final int recordsToUpdate = 50;
+
+ List<HoodieRecord> insert = DatabaseUtils.clearAndInsert(commitTime,
recordsToInsert);
+ List<HoodieRecord> update = DatabaseUtils.update(commitTime, insert);
+ }
Review comment:
PR is not final yet. WIP
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services