This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-3781 by this 
push:
     new 624ddd9  Add JDBCManager class with implementation to build an insert 
query string. Also did spotless formatting.
624ddd9 is described below

commit 624ddd97de269d3c749c891a1c3d03d99ccc3d1f
Author: Anil <aging...@pivotal.io>
AuthorDate: Tue Oct 24 18:23:22 2017 -0700

    Add JDBCManager class with implementation to build an insert query string.
    Also did spotless formatting.
---
 .../geode/connectors/jdbc/JDBCAsyncWriter.java     |  19 +++-
 .../geode/connectors/jdbc/JDBCConfiguration.java   |  25 +++--
 .../apache/geode/connectors/jdbc/JDBCHelper.java   |   5 -
 .../apache/geode/connectors/jdbc/JDBCManager.java  | 112 +++++++++++++++++++++
 .../jdbc/JDBCAsyncWriterIntegrationTest.java       |  10 +-
 .../connectors/jdbc/JDBCConfigurationUnitTest.java |   4 +-
 6 files changed, 150 insertions(+), 25 deletions(-)

diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
index fdf619d..54dfbaf 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java
@@ -19,6 +19,7 @@ import java.util.Properties;
 
 import org.apache.geode.cache.asyncqueue.AsyncEvent;
 import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.pdx.PdxInstance;
 
 /*
  * This class provides write behind cache semantics for a JDBC data source 
using AsyncEventListener.
@@ -31,6 +32,8 @@ public class JDBCAsyncWriter implements AsyncEventListener {
 
   private long successfulEvents = 0;
 
+  private JDBCManager manager;
+
   @Override
   public void close() {
     // TODO Auto-generated method stub
@@ -40,13 +43,25 @@ public class JDBCAsyncWriter implements AsyncEventListener {
   @Override
   public boolean processEvents(List<AsyncEvent> events) {
     totalEvents += events.size();
-    successfulEvents += events.size();
+    // TODO: set threadLocal to force PDXInstance
+    for (AsyncEvent event : events) {
+      // TODO: in some cases getDeserializedValue may return non-PdxInstance.
+      // In that case need to serialize and deserialize.
+      try {
+        PdxInstance value = (PdxInstance) event.getDeserializedValue();
+        this.manager.write(event.getRegion(), event.getOperation(), 
event.getKey(), value);
+        successfulEvents += 1;
+      } catch (RuntimeException ex) {
+        // TODO: need to log exceptions here
+      }
+    }
     return true;
   }
 
   @Override
   public void init(Properties props) {
-
+    JDBCConfiguration config = new JDBCConfiguration(props);
+    this.manager = new JDBCManager(config);
   };
 
   public long getTotalEvents() {
diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCConfiguration.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCConfiguration.java
index 2e3e061..763bbde 100644
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCConfiguration.java
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCConfiguration.java
@@ -9,23 +9,25 @@ import java.util.Properties;
 import java.util.Set;
 
 public class JDBCConfiguration {
-  
+
   private static final String DRIVER = "driver";
-  
+
   private static final String URL = "url";
-  
+
   private static final String USER = "user";
-  
+
   private static final String PASSWORD = "password";
- 
-  private static final List<String> knownProperties = 
Collections.unmodifiableList(Arrays.asList(DRIVER, URL, USER, PASSWORD));
 
-  private static final List<String> requiredProperties = 
Collections.unmodifiableList(Arrays.asList(DRIVER, URL));
-  
+  private static final List<String> knownProperties =
+      Collections.unmodifiableList(Arrays.asList(DRIVER, URL, USER, PASSWORD));
+
+  private static final List<String> requiredProperties =
+      Collections.unmodifiableList(Arrays.asList(DRIVER, URL));
+
   private final String driver;
 
   private String url;
-  
+
   JDBCConfiguration(Properties configProps) {
     validateKnownProperties(configProps);
     validateRequiredProperties(configProps);
@@ -49,12 +51,13 @@ public class JDBCConfiguration {
       throw new IllegalArgumentException("missing required properties: " + 
reqKeys);
     }
   }
-  
+
   public String getDriver() {
     return this.driver;
   }
+
   public String getURL() {
     return this.url;
   }
-  
+
 }
diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
deleted file mode 100644
index edad4c4..0000000
--- 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package org.apache.geode.connectors.jdbc;
-
-public class JDBCHelper {
-
-}
diff --git 
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
new file mode 100644
index 0000000..13a94c9
--- /dev/null
+++ 
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -0,0 +1,112 @@
+package org.apache.geode.connectors.jdbc;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.pdx.PdxInstance;
+
+public class JDBCManager {
+
+  private final JDBCConfiguration config;
+
+  private Connection conn;
+
+  private Statement stmt;
+
+  JDBCManager(JDBCConfiguration config) {
+    this.config = config;
+  }
+
+  private void establishConnection() {
+    // Class.forName(this.config.getDriver());
+    // conn = DriverManager.getConnection(this.config.getURL());
+    // stmt = conn.createStatement();
+  }
+
+  public interface ColumnValue {
+    String getColumnName();
+
+    Object getValue();
+  }
+
+  public void write(Region region, Operation operation, Object key, 
PdxInstance value) {
+    String tableName = getTableName(region);
+    List<ColumnValue> columnList = getColumnToValueList(tableName, key, value);
+    String query = getQueryString(tableName, columnList, operation);
+    Statement statement = getQueryStatement(columnList, query);
+    try {
+      statement.execute(query);
+    } catch (SQLException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    }
+  }
+
+  private String getQueryString(String tableName, List<ColumnValue> columnList,
+      Operation operation) {
+    if (operation.isCreate()) {
+      return getInsertQueryString(tableName, columnList);
+    } else if (operation.isUpdate()) {
+      return getUpdateQueryString(tableName, columnList);
+    } else if (operation.isDestroy()) {
+      return getDestroyQueryString(tableName, columnList);
+    } else {
+      throw new IllegalStateException("unsupported operation " + operation);
+    }
+  }
+
+  private String getDestroyQueryString(String tableName, List<ColumnValue> 
columnList) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private String getUpdateQueryString(String tableName, List<ColumnValue> 
columnList) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private String getInsertQueryString(String tableName, List<ColumnValue> 
columnList) {
+    StringBuilder columnNames = new StringBuilder("INSERT INTO " + tableName + 
'(');
+    StringBuilder columnValues = new StringBuilder(" VALUES (");
+    int columnCount = columnList.size();
+    int idx = 0;
+    for (ColumnValue cv : columnList) {
+      idx++;
+      columnNames.append(cv.getColumnName());
+      columnValues.append('?');
+      if (idx != columnCount) {
+        columnNames.append(", ");
+        columnValues.append(",");
+      }
+    }
+    columnNames.append(")");
+    columnValues.append(")");
+    return columnNames.append(columnValues).toString();
+  }
+
+  private Statement getQueryStatement(List<ColumnValue> columnList, String 
query) {
+
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private List<ColumnValue> getColumnToValueList(String tableName, Object key, 
PdxInstance value) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  private String getTableName(Region region) {
+    // TODO: check config for mapping
+    return region.getName();
+  }
+
+
+
+}
diff --git 
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
 
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 4b5d14d..68c1de0 100644
--- 
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++ 
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -46,7 +46,7 @@ public class JDBCAsyncWriterIntegrationTest {
 
   JDBCAsyncWriter jdbcWriter;
 
-  private String dbName="DerbyDB";
+  private String dbName = "DerbyDB";
 
   private String regionTableName = "employees";
 
@@ -100,7 +100,7 @@ public class JDBCAsyncWriterIntegrationTest {
     employees.put("2", "Emp2");
 
     Awaitility.await().atMost(30, TimeUnit.SECONDS)
-    .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
+        .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
 
   }
 
@@ -112,15 +112,15 @@ public class JDBCAsyncWriterIntegrationTest {
     employees.put("2", "Emp2");
 
     Awaitility.await().atMost(30, TimeUnit.SECONDS)
-    .until(() -> assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+        .until(() -> 
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
 
     validateTableRowCount(2);
   }
 
   private Region createRegionWithJDBCAsyncWriter(String regionName) {
     jdbcWriter = new JDBCAsyncWriter();
-    cache.createAsyncEventQueueFactory().setBatchSize(1)
-    .setBatchTimeInterval(1).create("jdbcAsyncQueue", jdbcWriter);
+    
cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
+        .create("jdbcAsyncQueue", jdbcWriter);
 
     RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
     rf.addAsyncEventQueueId("jdbcAsyncQueue");
diff --git 
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCConfigurationUnitTest.java
 
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCConfigurationUnitTest.java
index afefb16..c994e98 100644
--- 
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCConfigurationUnitTest.java
+++ 
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCConfigurationUnitTest.java
@@ -55,7 +55,7 @@ public class JDBCConfigurationUnitTest {
     expectedException.expectMessage("missing required properties: [driver]");
     new JDBCConfiguration(props);
   }
-  
+
   @Test
   public void testDriverProperty() {
     Properties props = new Properties();
@@ -64,7 +64,7 @@ public class JDBCConfigurationUnitTest {
     JDBCConfiguration config = new JDBCConfiguration(props);
     assertThat(config.getDriver()).isEqualTo("myDriver");
   }
-  
+
   @Test
   public void testURLProperty() {
     Properties props = new Properties();

-- 
To stop receiving notification emails like this one, please contact
['"commits@geode.apache.org" <commits@geode.apache.org>'].

Reply via email to