This is an automated email from the ASF dual-hosted git repository.
lgallinat pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3781 by this
push:
new bdab7eb Fix NPEs with missing table and conmnection configuration.
Add Dunit test to simulate end-to-end testing using gfsh commmands. Made
changes to use region name as table name when table name is not provided.
bdab7eb is described below
commit bdab7ebbdb4f46f709387aa39d7bc0ba8eafc822
Author: Lynn Gallinat <[email protected]>
AuthorDate: Thu Dec 14 17:36:37 2017 -0800
Fix NPEs with missing table and conmnection configuration.
Add Dunit test to simulate end-to-end testing using gfsh commmands.
Made changes to use region name as table name when table name is not
provided.
---
.../geode/connectors/jdbc/JdbcAsyncWriter.java | 10 +
.../jdbc/internal/ConnectionManager.java | 3 +-
.../connectors/jdbc/internal/RegionMapping.java | 12 +-
.../geode/connectors/jdbc/internal/SqlHandler.java | 16 +-
.../jdbc/JdbcAsyncWriterIntegrationTest.java | 2 +-
.../geode/connectors/jdbc/JdbcDUnitTest.java | 310 +++++++++++++++++++++
.../jdbc/internal/ConnectionManagerUnitTest.java | 1 +
.../connectors/jdbc/internal/SqlHandlerTest.java | 1 +
8 files changed, 350 insertions(+), 5 deletions(-)
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index 465e1c6..8f83ecb 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -41,6 +41,7 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback
implements AsyncEventL
private AtomicLong totalEvents = new AtomicLong();
private AtomicLong successfulEvents = new AtomicLong();
+ private AtomicLong failedEvents = new AtomicLong();
@SuppressWarnings("unused")
public JdbcAsyncWriter() {
@@ -68,6 +69,7 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback
implements AsyncEventL
getPdxInstance(event));
changeSuccessfulEvents(1);
} catch (RuntimeException ex) {
+ changeFailedEvents(1);
logger.error("Exception processing event {}", event, ex);
}
}
@@ -86,10 +88,18 @@ public class JdbcAsyncWriter extends AbstractJdbcCallback
implements AsyncEventL
return successfulEvents.get();
}
+ long getFailedEvents() {
+ return failedEvents.get();
+ }
+
private void changeSuccessfulEvents(long delta) {
successfulEvents.addAndGet(delta);
}
+ private void changeFailedEvents(long delta) {
+ failedEvents.addAndGet(delta);
+ }
+
private void changeTotalEvents(long delta) {
totalEvents.addAndGet(delta);
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
index 8e7795b..b6fc8b1 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
@@ -59,7 +59,8 @@ class ConnectionManager {
<K> List<ColumnValue> getColumnToValueList(ConnectionConfiguration config,
RegionMapping regionMapping, K key, PdxInstance value, Operation
operation) {
- String keyColumnName = getKeyColumnName(config,
regionMapping.getTableName());
+ String tableName = regionMapping.getRegionToTableName();
+ String keyColumnName = getKeyColumnName(config, tableName);
ColumnValue keyColumnValue = new ColumnValue(true, keyColumnName, key);
if (operation.isDestroy() || operation.isGet()) {
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
index 710bbbf..204ba37 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/RegionMapping.java
@@ -61,8 +61,18 @@ public class RegionMapping implements Serializable {
return primaryKeyInValue;
}
+ public String getRegionToTableName() {
+ if (tableName == null) {
+ return regionName;
+ }
+ return tableName;
+ }
+
public String getColumnNameForField(String fieldName) {
- String columnName = fieldToColumnMap.get(fieldName);
+ String columnName = null;
+ if (fieldToColumnMap != null) {
+ columnName = fieldToColumnMap.get(fieldName);
+ }
return columnName != null ? columnName : fieldName;
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 34703fc..240ff3f 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -51,7 +51,7 @@ public class SqlHandler {
List<ColumnValue> columnList =
manager.getColumnToValueList(connectionConfig, regionMapping, key,
null, Operation.GET);
- String tableName = regionMapping.getTableName();
+ String tableName = regionMapping.getRegionToTableName();
PreparedStatement statement = manager.getPreparedStatement(
manager.getConnection(connectionConfig), columnList, tableName,
Operation.GET, 0);
PdxInstanceFactory factory = getPdxInstanceFactory(region, regionMapping);
@@ -126,12 +126,24 @@ public class SqlHandler {
throw new IllegalArgumentException("PdxInstance cannot be null for
non-destroy operations");
}
RegionMapping regionMapping =
manager.getMappingForRegion(region.getName());
- final String tableName = regionMapping.getTableName();
+
+ if (regionMapping == null) {
+ throw new IllegalStateException("JDBC write failed. JDBC mapping for
region " +
+ region.getFullPath() +
+ " not found. Create mapping with gfsh command 'create
jdbc-mapping'.");
+ }
ConnectionConfiguration connectionConfig =
manager.getConnectionConfig(regionMapping.getConnectionConfigName());
+ if (connectionConfig == null) {
+ throw new IllegalStateException(
+ "JDBC write failed. JDBC Connection configuration for connection
name " +
+ regionMapping.getConnectionConfigName() + " not found.");
+ }
+
List<ColumnValue> columnList =
manager.getColumnToValueList(connectionConfig, regionMapping, key,
value, operation);
+ String tableName = regionMapping.getRegionToTableName();
int pdxTypeId = value == null ? 0 : ((PdxInstanceImpl)
value).getPdxType().getTypeId();
PreparedStatement statement = manager.getPreparedStatement(
manager.getConnection(connectionConfig), columnList, tableName,
operation, pdxTypeId);
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 2d52f34..2aacfbe 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -137,7 +137,7 @@ public class JdbcAsyncWriterIntegrationTest {
awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
- assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(0);
+ awaitUntil(() -> assertThat(jdbcWriter.getFailedEvents()).isEqualTo(1));
}
@Test
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
new file mode 100644
index 0000000..a70531c
--- /dev/null
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcDUnitTest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.io.Serializable;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.pdx.PdxInstance;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.LocatorServerStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+
+@Category(DistributedTest.class)
+public class JdbcDUnitTest implements Serializable {
+
+ private static final String DB_NAME = "DerbyDB";
+ private static final String TABLE_NAME = "employees";
+ private static final String REGION_NAME = "employees";
+ private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME
+ ";create=true";
+ private static final String CONNECTION_NAME = "TestConnection";
+
+ @Rule
+ public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+ @Rule
+ public LocatorServerStartupRule startupRule = new LocatorServerStartupRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ private MemberVM locator;
+ private MemberVM server;
+
+ @Before
+ public void setup() throws Exception {
+ locator = startupRule.startLocatorVM(0);
+ gfsh.connectAndVerify(locator);
+ server = startupRule.startServerVM(1, locator.getPort());
+ server.invoke(()-> {
+ createTable();
+ });
+ }
+
+ private void createTable() throws SQLException {
+ Connection connection = DriverManager.getConnection(CONNECTION_URL);
+ Statement statement = connection.createStatement();
+ statement.execute("Create Table " + TABLE_NAME
+ + " (id varchar(10) primary key not null, name varchar(10), age int)");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ server.invoke(() -> {
+ CacheFactory.getAnyInstance().close();
+ closeDB();
+ });
+ }
+
+ private void closeDB() throws Exception {
+ try {
+ Connection connection = DriverManager.getConnection(CONNECTION_URL);
+ Statement statement = connection.createStatement();
+ if (statement == null) {
+ statement = connection.createStatement();
+ }
+ statement.execute("Drop table " + TABLE_NAME);
+ statement.close();
+
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (SQLException ex) {
+ System.out.println("SQL Exception is thrown while closing the
database.");
+ }
+ }
+
+ @Test
+ public void throwsExceptionWhenNoMappingExistsUsingWriter() throws Exception
{
+ createRegion(true, false, false);
+ createJdbcConnection();
+
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp1").writeInt("age", 55).create();
+ Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+ assertThatThrownBy(() -> region.put("key1",
pdxEmployee1)).isExactlyInstanceOf(IllegalStateException.class).hasMessageContaining("JDBC
write failed");
+ });
+ }
+
+ @Test
+ public void throwsExceptionWhenNoMappingExistsUsingAsyncWriter() throws
Exception {
+ IgnoredException.addIgnoredException("IllegalStateException");
+ createRegion(false, true, false);
+ createJdbcConnection();
+
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp1").writeInt("age", 55).create();
+ Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+ region.put("key1", pdxEmployee1);
+
+ JdbcAsyncWriter asyncWriter =
(JdbcAsyncWriter)CacheFactory.getAnyInstance().getAsyncEventQueue("JAW").getAsyncEventListener();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ assertThat(asyncWriter.getFailedEvents()).isEqualTo(1);
+ });
+
+ });
+ }
+
+ @Test
+ public void throwsExceptionWhenNoMappingMatches() throws Exception {
+ createRegion(true, false, false);
+ createJdbcConnection();
+ createMapping("NoSuchRegion", CONNECTION_NAME);
+
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp1").writeInt("age", 55).create();
+ Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+ assertThatThrownBy(() -> region.put("key1",
pdxEmployee1)).isExactlyInstanceOf(IllegalStateException.class).hasMessageContaining("JDBC
write failed");
+ });
+ }
+
+ @Test
+ public void throwsExceptionWhenNoConnectionExists() throws Exception {
+ createRegion(true, false, false);
+ createMapping(REGION_NAME, CONNECTION_NAME);
+
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp1").writeInt("age", 55).create();
+ Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+ assertThatThrownBy(() -> region.put("key1",
pdxEmployee1)).isExactlyInstanceOf(IllegalStateException.class).hasMessageContaining("JDBC
write failed"); });
+ }
+
+ @Test
+ public void putWritesToDB() throws Exception {
+ createRegion(true, false, false);
+ createJdbcConnection();
+ createMapping(REGION_NAME, CONNECTION_NAME);
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "key1")
+ .writeString("name", "Emp1")
+ .writeInt("age", 55).create();
+
+ String key = "emp1";
+ CacheFactory.getAnyInstance().getRegion(REGION_NAME).put(key,
pdxEmployee1);
+ assertTableHasEmployeeData(1, pdxEmployee1, key);
+ });
+ }
+
+ @Test
+ public void putAsyncWritesToDB() throws Exception {
+ createRegion(true, false, false);
+ createJdbcConnection();
+ createMapping(REGION_NAME, CONNECTION_NAME);
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "key1")
+ .writeString("name", "Emp1")
+ .writeInt("age", 55).create();
+
+ String key = "emp1";
+ CacheFactory.getAnyInstance().getRegion(REGION_NAME).put(key,
pdxEmployee1);
+ assertTableHasEmployeeData(1, pdxEmployee1, key);
+ });
+ }
+
+ @Test
+ public void getReadsFromEmptyDB() throws Exception {
+ createRegion(false, false, true);
+ createJdbcConnection();
+ createMapping(REGION_NAME, CONNECTION_NAME);
+ server.invoke(() -> {
+ String key = "emp1";
+ Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+ region.get(key);
+ assertThat(region.size()).isEqualTo(0);
+ });
+ }
+
+ @Test
+ public void getReadsFromDB() throws Exception {
+ createRegion(true, false, true);
+ createJdbcConnection();
+ createMapping(REGION_NAME, CONNECTION_NAME);
+ server.invoke(() -> {
+ PdxInstance pdxEmployee1 =
+
CacheFactory.getAnyInstance().createPdxInstanceFactory(Employee.class.getName())
+ .writeString("id", "id1")
+ .writeString("name", "Emp1")
+ .writeInt("age", 55).create();
+
+ String key = "id1";
+ Region region = CacheFactory.getAnyInstance().getRegion(REGION_NAME);
+ region.put(key, pdxEmployee1);
+ region.invalidate(key);
+
+ PdxInstance result = (PdxInstance)region.get(key);
+ assertThat(result.getField("id")).isEqualTo(pdxEmployee1.getField("id"));
+
assertThat(result.getField("name")).isEqualTo(pdxEmployee1.getField("name"));
+
assertThat(result.getField("age")).isEqualTo(pdxEmployee1.getField("age"));
+ });
+ }
+
+ private void createJdbcConnection() {
+ final String commandStr = "create jdbc-connection --name=" +
CONNECTION_NAME + " --url="
+ + CONNECTION_URL
+ + "
--params=param1:value1,this.is.param2:value.2,this-is-value-3,value-3,this_is_param_4:value_4";
+ gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+ }
+
+ private void createAsyncListener(String id) {
+ final String commandStr = "create async-event-queue --id=" + id
+ + " --listener=" + JdbcAsyncWriter.class.getName()
+ + " --batch-size=1 --batch-time-interval=0 --parallel=false";
+ gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+ }
+
+ private void createRegion(boolean withCacheWriter, boolean withAsyncWriter,
boolean withLoader) {
+ StringBuffer createRegionCmd = new StringBuffer();
+ createRegionCmd.append("create region --name=" + REGION_NAME + "
--type=REPLICATE ");
+ if (withCacheWriter) {
+ createRegionCmd.append(" --cache-writer=" + JdbcWriter.class.getName());
+ }
+ if (withLoader) {
+ createRegionCmd.append(" --cache-loader=" + JdbcLoader.class.getName());
+ }
+ if (withAsyncWriter) {
+ createAsyncListener("JAW");
+ createRegionCmd.append(" --async-event-queue-id=JAW");
+ }
+
+ gfsh.executeAndAssertThat(createRegionCmd.toString()).statusIsSuccess();
+ }
+
+ private void createMapping(String regionName, String connectionName) {
+ final String commandStr =
+ "create jdbc-mapping --region=" + regionName + " --connection=" +
connectionName;
+ gfsh.executeAndAssertThat(commandStr).statusIsSuccess();
+ }
+
+ private void assertTableHasEmployeeData(int size, PdxInstance employee,
String key)
+ throws SQLException {
+ Connection connection = DriverManager.getConnection(CONNECTION_URL);
+ Statement statement = connection.createStatement();
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ assertThat(getRowCount(statement, TABLE_NAME)).isEqualTo(size);
+ });
+
+ ResultSet resultSet = statement.executeQuery("select * from " +
REGION_NAME + " order by id asc");
+ assertThat(resultSet.next()).isTrue();
+ assertThat(resultSet.getString("id")).isEqualTo(key);
+
assertThat(resultSet.getString("name")).isEqualTo(employee.getField("name"));
+ assertThat(resultSet.getObject("age")).isEqualTo(employee.getField("age"));
+ }
+
+ private int getRowCount(Statement stmt, String tableName) {
+ try {
+ ResultSet resultSet =
+ stmt.executeQuery("select count(*) from " + tableName);
+ resultSet.next();
+ return resultSet.getInt(1);
+ } catch (SQLException e){
+ return -1;
+ }
+ }
+
+}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
index 947559d..9478b19 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/ConnectionManagerUnitTest.java
@@ -68,6 +68,7 @@ public class ConnectionManagerUnitTest {
connectionConfig = new ConnectionConfiguration("name", "url", null, null,
null);
when(mapping.getTableName()).thenReturn(TABLE_NAME);
+ when(mapping.getRegionToTableName()).thenReturn(TABLE_NAME);
doReturn(connection).when(manager).getSQLConnection(connectionConfig);
key = new Object();
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
index 7cd623b..05dcdb9 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/internal/SqlHandlerTest.java
@@ -334,6 +334,7 @@ public class SqlHandlerTest {
regionMapping = mock(RegionMapping.class);
when(regionMapping.getRegionName()).thenReturn(REGION_NAME);
when(regionMapping.getTableName()).thenReturn(TABLE_NAME);
+ when(regionMapping.getRegionToTableName()).thenReturn(TABLE_NAME);
when(manager.getMappingForRegion(any())).thenReturn(regionMapping);
List<ColumnValue> columnList = new ArrayList<>();
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].