This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3781 by this
push:
new c80d024 Added support for upsert operation.
c80d024 is described below
commit c80d024816046e3929bc316d6e007f607db36f07
Author: Anil <[email protected]>
AuthorDate: Thu Oct 26 16:24:55 2017 -0700
Added support for upsert operation.
---
.../apache/geode/connectors/jdbc/JDBCManager.java | 26 +++++++++++-
.../jdbc/JDBCAsyncWriterIntegrationTest.java | 49 ++++++++++++++++++++++
2 files changed, 74 insertions(+), 1 deletion(-)
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
index 86980e0..1b98da6 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCManager.java
@@ -69,6 +69,26 @@ public class JDBCManager {
public void write(Region region, Operation operation, Object key,
PdxInstance value) {
String tableName = getTableName(region);
List<ColumnValue> columnList = getColumnToValueList(tableName, key, value,
operation);
+ int updateCount = executeWrite(columnList, tableName, operation, false);
+ if (operation.isDestroy()) {
+ return;
+ }
+ if (updateCount <= 0) {
+ Operation upsertOp;
+ if (operation.isUpdate()) {
+ upsertOp = Operation.CREATE;
+ } else {
+ upsertOp = Operation.UPDATE;
+ }
+ updateCount = executeWrite(columnList, tableName, upsertOp, true);
+ }
+ if (updateCount != 1) {
+ throw new IllegalStateException("Unexpected updateCount " + updateCount);
+ }
+ }
+
+ private int executeWrite(List<ColumnValue> columnList, String tableName,
Operation operation,
+ boolean handleException) {
PreparedStatement pstmt = getQueryStatement(columnList, tableName,
operation);
try {
int idx = 0;
@@ -77,8 +97,12 @@ public class JDBCManager {
pstmt.setObject(idx, cv.getValue());
}
pstmt.execute();
+ return pstmt.getUpdateCount();
} catch (SQLException e) {
- handleSQLException(e);
+ if (handleException || operation.isDestroy()) {
+ handleSQLException(e);
+ }
+ return 0;
} finally {
clearStatement(pstmt);
}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
index 42999a3..87b654b 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java
@@ -231,6 +231,55 @@ public class JDBCAsyncWriterIntegrationTest {
assertThat(rs.next()).isFalse();
}
+ @Test
+ public void canUpdateBecomeInsert() throws Exception {
+ Region employees = createRegionWithJDBCAsyncWriter(regionTableName,
getRequiredProperties());
+ PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+ .writeInt("age", 55).create();
+ employees.put("1", pdx1);
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+ stmt.execute("delete from " + regionTableName + " where id = '1'");
+ validateTableRowCount(0);
+
+ PdxInstance pdx3 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+ .writeInt("age", 72).create();
+ employees.put("1", pdx3);
+
+ Awaitility.await().atMost(10, TimeUnit.SECONDS)
+ .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+ ResultSet rs = stmt.executeQuery("select * from " + regionTableName + "
order by id asc");
+ assertThat(rs.next()).isTrue();
+ assertThat(rs.getString("id")).isEqualTo("1");
+ assertThat(rs.getString("name")).isEqualTo("Emp1");
+ assertThat(rs.getObject("age")).isEqualTo(72);
+ assertThat(rs.next()).isFalse();
+ }
+
+ @Test
+ public void canInsertBecomeUpdate() throws Exception {
+ stmt.execute("Insert into " + regionTableName + " values('1', 'bogus',
11)");
+ validateTableRowCount(1);
+
+ Region employees = createRegionWithJDBCAsyncWriter(regionTableName,
getRequiredProperties());
+ PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
+ .writeInt("age", 55).create();
+ employees.put("1", pdx1);
+
+ Awaitility.await().atMost(30, TimeUnit.SECONDS)
+ .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+ ResultSet rs = stmt.executeQuery("select * from " + regionTableName + "
order by id asc");
+ assertThat(rs.next()).isTrue();
+ assertThat(rs.getString("id")).isEqualTo("1");
+ assertThat(rs.getString("name")).isEqualTo("Emp1");
+ assertThat(rs.getObject("age")).isEqualTo(55);
+ assertThat(rs.next()).isFalse();
+ }
+
private Region createRegionWithJDBCAsyncWriter(String regionName, Properties
props) {
jdbcWriter = new JDBCAsyncWriter();
jdbcWriter.init(props);
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].