This is an automated email from the ASF dual-hosted git repository.
nreich pushed a commit to branch feature/GEODE-3781
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-3781 by this
push:
new 76abaff Wired jdbc loaders and writers to the connector service and
cleaned up tests
76abaff is described below
commit 76abaffd4eb3f31ca61df0969610e9e52bbddc7e
Author: Nick Reich <[email protected]>
AuthorDate: Thu Nov 16 15:19:09 2017 -0800
Wired jdbc loaders and writers to the connector service and cleaned up tests
---
.../connectors/jdbc/AbstractJdbcCallback.java | 65 +++++
.../geode/connectors/jdbc/JdbcAsyncWriter.java | 90 +++----
.../apache/geode/connectors/jdbc/JdbcLoader.java | 42 +--
.../connectors/jdbc/JdbcSynchronousWriter.java | 82 +++---
.../jdbc/internal/ConnectionManager.java | 4 +-
.../internal/InternalJdbcConnectorService.java | 5 +
.../geode/connectors/jdbc/internal/SqlHandler.java | 4 +
.../connectors/jdbc/AbstractJdbcCallbackTest.java | 71 +++++
.../org/apache/geode/connectors/jdbc/Employee.java | 54 ++++
.../jdbc/JdbcAsyncWriterIntegrationTest.java | 291 ++++++++-------------
.../geode/connectors/jdbc/JdbcAsyncWriterTest.java | 89 +++++++
.../connectors/jdbc/JdbcLoaderIntegrationTest.java | 49 ++--
.../geode/connectors/jdbc/JdbcLoaderTest.java | 40 +++
.../jdbc/JdbcSynchronousWriterIntegrationTest.java | 268 +++++++------------
.../connectors/jdbc/JdbcSynchronousWriterTest.java | 115 ++++++++
15 files changed, 758 insertions(+), 511 deletions(-)
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallback.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallback.java
new file mode 100644
index 0000000..bb0c2c5
--- /dev/null
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallback.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
+ * applicable law or agreed to in writing, software distributed under the
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
+ * See the License for the specific language governing permissions and
limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import java.util.Properties;
+
+import org.apache.geode.cache.CacheCallback;
+import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+
+abstract class AbstractJdbcCallback implements CacheCallback {
+
+ private volatile SqlHandler sqlHandler;
+
+ AbstractJdbcCallback() {
+ // nothing
+ }
+
+ AbstractJdbcCallback(SqlHandler sqlHandler) {
+ this.sqlHandler = sqlHandler;
+ }
+
+ @Override
+ public void close() {
+ if (sqlHandler != null) {
+ sqlHandler.close();
+ }
+ }
+
+ @Override
+ public void init(Properties props) {
+ // nothing
+ }
+
+ SqlHandler getSqlHandler() {
+ return sqlHandler;
+ }
+
+ void checkInitialized(InternalCache cache) {
+ if (sqlHandler == null) {
+ initialize(cache);
+ }
+ }
+
+ private synchronized void initialize(InternalCache cache) {
+ if (sqlHandler == null) {
+ InternalJdbcConnectorService service =
cache.getService(InternalJdbcConnectorService.class);
+ ConnectionManager manager = new ConnectionManager(service);
+ sqlHandler = new SqlHandler(manager);
+ }
+ }
+}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
index a009658..dfc8297 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriter.java
@@ -15,100 +15,90 @@
package org.apache.geode.connectors.jdbc;
import java.util.List;
-import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.logging.log4j.Logger;
import org.apache.geode.CopyHelper;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.asyncqueue.AsyncEventListener;
import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.pdx.PdxInstance;
-import org.apache.logging.log4j.Logger;
-/*
+/**
* This class provides write behind cache semantics for a JDBC data source
using AsyncEventListener.
*
* @since Geode 1.4
*/
-public class JdbcAsyncWriter implements AsyncEventListener {
- static final Logger logger = LogService.getLogger();
- private long totalEvents = 0;
- private long successfulEvents = 0;
- private ConnectionManager manager;
- private SqlHandler sqlHandler;
+public class JdbcAsyncWriter extends AbstractJdbcCallback implements
AsyncEventListener {
+ private static final Logger logger = LogService.getLogger();
- // Constructor for test purposes only
- JdbcAsyncWriter(ConnectionManager manager) {
- this.manager = manager;
- sqlHandler = new SqlHandler(manager);
- }
+ private AtomicLong totalEvents = new AtomicLong();
+ private AtomicLong successfulEvents = new AtomicLong();
- @Override
- public void close() {
- if (this.manager != null) {
- this.manager.close();
- }
+ @SuppressWarnings("unused")
+ public JdbcAsyncWriter() {
+ super();
}
- /**
- * precondition: DefaultQuery.setPdxReadSerialized(true)
- */
- @SuppressWarnings("rawtypes")
- private PdxInstance getPdxInstance(AsyncEvent event) {
- Object v = event.getDeserializedValue();
- if (!(v instanceof PdxInstance)) {
- v = CopyHelper.copy(v);
- }
- return (PdxInstance) v;
+ // Constructor for test purposes only
+ JdbcAsyncWriter(SqlHandler sqlHandler) {
+ super(sqlHandler);
}
-
- @SuppressWarnings("rawtypes")
@Override
public boolean processEvents(List<AsyncEvent> events) {
changeTotalEvents(events.size());
- // TODO: have a better API that lets you do this
+
+ if (!events.isEmpty()) {
+ checkInitialized((InternalCache)
events.get(0).getRegion().getRegionService());
+ }
+
DefaultQuery.setPdxReadSerialized(true);
try {
for (AsyncEvent event : events) {
try {
- sqlHandler.write(event.getRegion(), event.getOperation(),
event.getKey(),
+ getSqlHandler().write(event.getRegion(), event.getOperation(),
event.getKey(),
getPdxInstance(event));
changeSuccessfulEvents(1);
} catch (RuntimeException ex) {
- // TODO improve the following logging
- logger.error("Exception processing event " + event, ex);
+ logger.error("Exception processing event {}", event, ex);
}
}
} finally {
DefaultQuery.setPdxReadSerialized(false);
}
+
return true;
}
- @Override
- public void init(Properties props) {
- /*
- * JDBCConfiguration config = new JDBCConfiguration(props); this.manager =
new
- * ConnectionManager(config);
- */
+ long getTotalEvents() {
+ return totalEvents.get();
}
- private synchronized void changeTotalEvents(long delta) {
- this.totalEvents += delta;
+ long getSuccessfulEvents() {
+ return successfulEvents.get();
}
- public synchronized long getTotalEvents() {
- return this.totalEvents;
+ private void changeSuccessfulEvents(long delta) {
+ successfulEvents.addAndGet(delta);
}
- private synchronized void changeSuccessfulEvents(long delta) {
- this.successfulEvents += delta;
+ private void changeTotalEvents(long delta) {
+ totalEvents.addAndGet(delta);
}
- public synchronized long getSuccessfulEvents() {
- return this.successfulEvents;
+ /**
+ * precondition: DefaultQuery.setPdxReadSerialized(true)
+ */
+ private PdxInstance getPdxInstance(AsyncEvent event) {
+ Object value = event.getDeserializedValue();
+ if (!(value instanceof PdxInstance)) {
+ value = CopyHelper.copy(value);
+ }
+ return (PdxInstance) value;
}
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
index 2fec773..aed38f1 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcLoader.java
@@ -14,54 +14,38 @@
*/
package org.apache.geode.connectors.jdbc;
-import org.apache.geode.cache.LoaderHelper;
-import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
-
-import java.util.Properties;
-
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
+import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
-/*
+/**
* This class provides loading from a data source using JDBC.
*
* @since Geode 1.4
*/
-public class JdbcLoader<K, V> implements CacheLoader<K, V> {
- private ConnectionManager manager;
- private SqlHandler sqlHandler;
+public class JdbcLoader<K, V> extends AbstractJdbcCallback implements
CacheLoader<K, V> {
- // Constructor for test purposes only
- JdbcLoader(ConnectionManager manager) {
- this.manager = manager;
- sqlHandler = new SqlHandler(manager);
+ @SuppressWarnings("unused")
+ public JdbcLoader() {
+ super();
}
- @Override
- public void close() {
- if (this.manager != null) {
- this.manager.close();
- }
+ // Constructor for test purposes only
+ JdbcLoader(SqlHandler sqlHandler) {
+ super(sqlHandler);
}
- @SuppressWarnings("unchecked")
- @Override
/**
* @return this method always returns a PdxInstance. It does not matter what
the V generic
* parameter is set to.
*/
+ @Override
public V load(LoaderHelper<K, V> helper) throws CacheLoaderException {
// The following cast to V is to keep the compiler happy
// but is erased at runtime and no actual cast happens.
- return (V) sqlHandler.read(helper.getRegion(), helper.getKey());
+ checkInitialized((InternalCache) helper.getRegion().getRegionService());
+ return (V) getSqlHandler().read(helper.getRegion(), helper.getKey());
}
-
- public void init(Properties props) {
- /*
- * JDBCConfiguration config = new JDBCConfiguration(props); this.manager =
new
- * ConnectionManager(config);
- */
- // TODO: make this get the JdbcConnectorService?
- };
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
index cf16152..6440bf5 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriter.java
@@ -14,8 +14,6 @@
*/
package org.apache.geode.connectors.jdbc;
-import java.util.Properties;
-
import org.apache.geode.CopyHelper;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
@@ -23,74 +21,46 @@ import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.SerializedCacheValue;
import org.apache.geode.cache.query.internal.DefaultQuery;
-import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.pdx.PdxInstance;
-/*
+/**
* This class provides synchronous write through to a data source using JDBC.
*
* @since Geode 1.4
*/
-public class JdbcSynchronousWriter<K, V> implements CacheWriter<K, V> {
- private ConnectionManager manager;
- private SqlHandler sqlHandler;
-
- // Constructor for test purposes only
- JdbcSynchronousWriter(ConnectionManager manager) {
- this.manager = manager;
- sqlHandler = new SqlHandler(manager);
- }
+public class JdbcSynchronousWriter<K, V> extends AbstractJdbcCallback
implements CacheWriter<K, V> {
- @Override
- public void init(Properties props) {
- /*
- * JDBCConfiguration config = new JDBCConfiguration(props); this.manager =
new
- * ConnectionManager(config);
- */
+ @SuppressWarnings("unused")
+ public JdbcSynchronousWriter() {
+ super();
}
- @Override
- public void close() {
- if (this.manager != null) {
- this.manager.close();
- }
+ // Constructor for test purposes only
+ JdbcSynchronousWriter(SqlHandler sqlHandler) {
+ super(sqlHandler);
}
- private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
- // TODO: have a better API that lets you do this
- DefaultQuery.setPdxReadSerialized(true);
- try {
- Object v = event.getNewValue();
- if (!(v instanceof PdxInstance)) {
- SerializedCacheValue<V> sv = event.getSerializedNewValue();
- if (sv != null) {
- v = sv.getDeserializedValue();
- } else {
- v = CopyHelper.copy(v);
- }
- }
- return (PdxInstance) v;
- } finally {
- DefaultQuery.setPdxReadSerialized(false);
- }
- }
@Override
public void beforeUpdate(EntryEvent<K, V> event) throws CacheWriterException
{
- sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+ checkInitialized((InternalCache) event.getRegion().getRegionService());
+ getSqlHandler().write(event.getRegion(), event.getOperation(),
event.getKey(),
getPdxNewValue(event));
}
@Override
public void beforeCreate(EntryEvent<K, V> event) throws CacheWriterException
{
- sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+ checkInitialized((InternalCache) event.getRegion().getRegionService());
+ getSqlHandler().write(event.getRegion(), event.getOperation(),
event.getKey(),
getPdxNewValue(event));
}
@Override
public void beforeDestroy(EntryEvent<K, V> event) throws
CacheWriterException {
- sqlHandler.write(event.getRegion(), event.getOperation(), event.getKey(),
+ checkInitialized((InternalCache) event.getRegion().getRegionService());
+ getSqlHandler().write(event.getRegion(), event.getOperation(),
event.getKey(),
getPdxNewValue(event));
}
@@ -104,4 +74,26 @@ public class JdbcSynchronousWriter<K, V> implements
CacheWriter<K, V> {
// this event is not sent to JDBC
}
+ private PdxInstance getPdxNewValue(EntryEvent<K, V> event) {
+ DefaultQuery.setPdxReadSerialized(true);
+ try {
+ Object newValue = event.getNewValue();
+ if (!(newValue instanceof PdxInstance)) {
+ SerializedCacheValue<V> serializedNewValue =
event.getSerializedNewValue();
+ if (serializedNewValue != null) {
+ newValue = serializedNewValue.getDeserializedValue();
+ } else {
+ newValue = CopyHelper.copy(newValue);
+ }
+ if (newValue != null && !(newValue instanceof PdxInstance)) {
+ String valueClassName = newValue == null ? "null" :
newValue.getClass().getName();
+ throw new IllegalArgumentException(getClass().getSimpleName()
+ + " only supports PDX values; newValue is " + valueClassName);
+ }
+ }
+ return (PdxInstance) newValue;
+ } finally {
+ DefaultQuery.setPdxReadSerialized(false);
+ }
+ }
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
index 7746b49..a114da7 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/ConnectionManager.java
@@ -32,7 +32,7 @@ import org.apache.geode.pdx.PdxInstance;
public class ConnectionManager {
- private final JdbcConnectorService configService;
+ private final InternalJdbcConnectorService configService;
private final Map<String, Connection> connectionMap = new
ConcurrentHashMap<>();
@@ -40,7 +40,7 @@ public class ConnectionManager {
private final ThreadLocal<PreparedStatementCache> preparedStatementCache =
new ThreadLocal<>();
- public ConnectionManager(JdbcConnectorService configService) {
+ public ConnectionManager(InternalJdbcConnectorService configService) {
this.configService = configService;
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
index 881dec8..49938e4 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/InternalJdbcConnectorService.java
@@ -21,4 +21,9 @@ public interface InternalJdbcConnectorService extends
Extension<Cache>, CacheSer
void addOrUpdateConnectionConfig(ConnectionConfiguration config);
void addOrUpdateRegionMapping(RegionMapping mapping);
+
+ ConnectionConfiguration getConnectionConfig(String connectionName);
+
+ RegionMapping getMappingForRegion(String regionName);
+
}
diff --git
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
index 0bdcab2..5aefb20 100644
---
a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
+++
b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/internal/SqlHandler.java
@@ -33,6 +33,10 @@ public class SqlHandler {
this.manager = manager;
}
+ public void close() {
+ manager.close();
+ }
+
public PdxInstance read(Region region, Object key) {
if (key == null) {
throw new IllegalArgumentException("Key for query cannot be null");
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallbackTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallbackTest.java
new file mode 100644
index 0000000..05b6334
--- /dev/null
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/AbstractJdbcCallbackTest.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
+ * applicable law or agreed to in writing, software distributed under the
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
+ * See the License for the specific language governing permissions and
limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.connectors.jdbc.internal.InternalJdbcConnectorService;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalCache;
+
+public class AbstractJdbcCallbackTest {
+
+ private AbstractJdbcCallback jdbcCallback;
+ private SqlHandler sqlHandler;
+
+ @Before
+ public void setUp() throws Exception {
+ sqlHandler = mock(SqlHandler.class);
+ jdbcCallback = new AbstractJdbcCallback(sqlHandler) {};
+ }
+
+ @Test
+ public void closesSqlHandler() throws Exception {
+ jdbcCallback.close();
+ verify(sqlHandler, times(1)).close();
+ }
+
+ @Test
+ public void returnsCorrectSqlHander() throws Exception {
+ assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
+ }
+
+ @Test
+ public void checkInitializedDoesNothingIfInitialized() {
+ jdbcCallback.checkInitialized(mock(InternalCache.class));
+ assertThat(jdbcCallback.getSqlHandler()).isSameAs(sqlHandler);
+ }
+
+ @Test
+ public void initializedSqlHandlerIfNoneExists() {
+ jdbcCallback = new AbstractJdbcCallback() {};
+ InternalCache cache = mock(InternalCache.class);
+ InternalJdbcConnectorService service =
mock(InternalJdbcConnectorService.class);
+ when(cache.getService(any())).thenReturn(service);
+ assertThat(jdbcCallback.getSqlHandler()).isNull();
+
+ jdbcCallback.checkInitialized(cache);
+
+ assertThat(jdbcCallback.getSqlHandler()).isNotNull();
+ }
+}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
new file mode 100644
index 0000000..82e2e90
--- /dev/null
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/Employee.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.connectors.jdbc;
+
+import org.apache.geode.pdx.PdxReader;
+import org.apache.geode.pdx.PdxSerializable;
+import org.apache.geode.pdx.PdxWriter;
+
+@SuppressWarnings("unused")
+public class Employee implements PdxSerializable {
+ private String name;
+ private int age;
+
+ public Employee() {
+ // nothing
+ }
+
+ Employee(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ String getName() {
+ return name;
+ }
+
+ int getAge() {
+ return age;
+ }
+
+ @Override
+ public void toData(PdxWriter writer) {
+ writer.writeString("name", this.name);
+ writer.writeInt("age", this.age);
+ }
+
+ @Override
+ public void fromData(PdxReader reader) {
+ this.name = reader.readString("name");
+ this.age = reader.readInt("age");
+ }
+}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
index 96e5328..cda4c2c 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterIntegrationTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.connectors.jdbc;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
import static org.assertj.core.api.Assertions.assertThat;
import java.sql.Connection;
@@ -33,33 +34,43 @@ import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
import org.apache.geode.connectors.jdbc.internal.TestConfigService;
import org.apache.geode.pdx.PdxInstance;
-import org.apache.geode.pdx.PdxReader;
-import org.apache.geode.pdx.PdxSerializable;
-import org.apache.geode.pdx.PdxSerializationException;
-import org.apache.geode.pdx.PdxWriter;
import org.apache.geode.test.junit.categories.IntegrationTest;
@Category(IntegrationTest.class)
public class JdbcAsyncWriterIntegrationTest {
+
+ private static final String DB_NAME = "DerbyDB";
+ private static final String REGION_TABLE_NAME = "employees";
+ private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME
+ ";create=true";
+
private Cache cache;
+ private Region<String, PdxInstance> employees;
private Connection connection;
private Statement statement;
private JdbcAsyncWriter jdbcWriter;
- private String dbName = "DerbyDB";
- private String regionTableName = "employees";
- private String connectionURL = "jdbc:derby:memory:" + dbName +
";create=true";
+ private PdxInstance pdxEmployee1;
+ private PdxInstance pdxEmployee2;
+ private Employee employee1;
+ private Employee employee2;
@Before
public void setup() throws Exception {
cache = new CacheFactory().setPdxReadSerialized(false).create();
- connection = DriverManager.getConnection(connectionURL);
+ employees = createRegionWithJDBCAsyncWriter(REGION_TABLE_NAME);
+ connection = DriverManager.getConnection(CONNECTION_URL);
statement = connection.createStatement();
- statement.execute("Create Table " + regionTableName
+ statement.execute("Create Table " + REGION_TABLE_NAME
+ " (id varchar(10) primary key not null, name varchar(10), age int)");
+ pdxEmployee1 = cache.createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp1").writeInt("age", 55).create();
+ pdxEmployee2 = cache.createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp2").writeInt("age", 21).create();
+ employee1 = (Employee) pdxEmployee1.getObject();
+ employee2 = (Employee) pdxEmployee2.getObject();
}
@After
@@ -72,7 +83,7 @@ public class JdbcAsyncWriterIntegrationTest {
if (statement == null) {
statement = connection.createStatement();
}
- statement.execute("Drop table " + regionTableName);
+ statement.execute("Drop table " + REGION_TABLE_NAME);
statement.close();
if (connection != null) {
@@ -82,246 +93,162 @@ public class JdbcAsyncWriterIntegrationTest {
@Test
public void validateJDBCAsyncWriterTotalEvents() {
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- PdxInstance pdx2 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
- .writeInt("age", 21).create();
- employees.put("1", pdx1);
- employees.put("2", pdx2);
+ employees.put("1", pdxEmployee1);
+ employees.put("2", pdxEmployee2);
- Awaitility.await().atMost(5, TimeUnit.SECONDS)
- .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
+ awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2));
}
@Test
public void canInsertIntoTable() throws Exception {
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- PdxInstance pdx2 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
- .writeInt("age", 21).create();
- employees.put("1", pdx1);
- employees.put("2", pdx2);
-
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
-
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(21);
- assertThat(rs.next()).isFalse();
+ employees.put("1", pdxEmployee1);
+ employees.put("2", pdxEmployee2);
+
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee1);
+ assertRecordMatchesEmployee(resultSet, "2", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws
Exception {
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
.writeInt("age", 55).writeInt("id", 3).create();
employees.put("1", pdx1);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee1);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void putNonPdxInstanceFails() {
- Region employees = createRegionWithJDBCAsyncWriter(regionTableName);
- employees.put("1", "non pdx instance");
+ Region nonPdxEmployees = this.employees;
+ nonPdxEmployees.put("1", "non pdx instance");
+
+ awaitUntil(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(1));
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(0);
}
@Test
public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
- Region employees = createRegionWithJDBCAsyncWriter(regionTableName);
- Object value = new TestEmployee("Emp2", 22);
- employees.put("2", value);
-
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
-
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(22);
- assertThat(rs.next()).isFalse();
+ Region nonPdxEmployees = this.employees;
+ Employee value = new Employee("Emp2", 22);
+ nonPdxEmployees.put("2", value);
+
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "2", value);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canDestroyFromTable() throws Exception {
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- PdxInstance pdx2 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
- .writeInt("age", 21).create();
- employees.put("1", pdx1);
- employees.put("2", pdx2);
+ employees.put("1", pdxEmployee1);
+ employees.put("2", pdxEmployee2);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
- try {
- employees.destroy("1");
- } catch (PdxSerializationException ignore) {
- // destroy tries to deserialize old value
- // which does not work because our PdxInstance
- // does not have a real class
- }
+ employees.destroy("1");
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(3));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(3));
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(21);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "2", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canUpdateTable() throws Exception {
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- employees.put("1", pdx1);
+ employees.put("1", pdxEmployee1);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
-
- PdxInstance pdx3 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 72).create();
- try {
- employees.put("1", pdx3);
- } catch (PdxSerializationException ignore) {
- // put tries to deserialize old value
- // which does not work because our PdxInstance
- // does not have a real class
- }
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+ employees.put("1", pdxEmployee2);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getObject("age")).isEqualTo(72);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canUpdateBecomeInsert() throws Exception {
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- employees.put("1", pdx1);
+ employees.put("1", pdxEmployee1);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
- statement.execute("delete from " + regionTableName + " where id = '1'");
+ statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
validateTableRowCount(0);
- PdxInstance pdx3 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 72).create();
- try {
- employees.put("1", pdx3);
- } catch (PdxSerializationException ignore) {
- // put tries to deserialize old value
- // which does not work because our PdxInstance
- // does not have a real class
- }
+ employees.put("1", pdxEmployee2);
- Awaitility.await().atMost(10, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(2));
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(72);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canInsertBecomeUpdate() throws Exception {
- statement.execute("Insert into " + regionTableName + " values('1',
'bogus', 11)");
+ statement.execute("Insert into " + REGION_TABLE_NAME + " values('1',
'bogus', 11)");
validateTableRowCount(1);
- Region<String, PdxInstance> employees =
createRegionWithJDBCAsyncWriter(regionTableName);
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- employees.put("1", pdx1);
+ employees.put("1", pdxEmployee1);
- Awaitility.await().atMost(30, TimeUnit.SECONDS)
- .until(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+ awaitUntil(() ->
assertThat(jdbcWriter.getSuccessfulEvents()).isEqualTo(1));
+
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee1);
+ assertThat(resultSet.next()).isFalse();
+ }
+
+ private void awaitUntil(final Runnable supplier) {
+ Awaitility.await().atMost(30, TimeUnit.SECONDS).until(supplier);
+ }
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isFalse();
+ private void assertRecordMatchesEmployee(ResultSet resultSet, String key,
Employee employee)
+ throws SQLException {
+ assertThat(resultSet.next()).isTrue();
+ assertThat(resultSet.getString("id")).isEqualTo(key);
+ assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
+ assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
}
private Region<String, PdxInstance> createRegionWithJDBCAsyncWriter(String
regionName) {
- jdbcWriter = new JdbcAsyncWriter(createManager());
+ jdbcWriter = new JdbcAsyncWriter(createSqlHandler());
cache.createAsyncEventQueueFactory().setBatchSize(1).setBatchTimeInterval(1)
.create("jdbcAsyncQueue", jdbcWriter);
- RegionFactory<String, PdxInstance> rf =
cache.createRegionFactory(RegionShortcut.REPLICATE);
- rf.addAsyncEventQueueId("jdbcAsyncQueue");
- return rf.create(regionName);
+ RegionFactory<String, PdxInstance> regionFactory =
cache.createRegionFactory(REPLICATE);
+ regionFactory.addAsyncEventQueueId("jdbcAsyncQueue");
+ return regionFactory.create(regionName);
}
private void validateTableRowCount(int expected) throws Exception {
- ResultSet rs = statement.executeQuery("select count(*) from " +
regionTableName);
- rs.next();
- int size = rs.getInt(1);
+ ResultSet resultSet = statement.executeQuery("select count(*) from " +
REGION_TABLE_NAME);
+ resultSet.next();
+ int size = resultSet.getInt(1);
assertThat(size).isEqualTo(expected);
}
- private ConnectionManager createManager() {
- return new ConnectionManager(TestConfigService.getTestConfigService());
+ private SqlHandler createSqlHandler() {
+ return new SqlHandler(new
ConnectionManager(TestConfigService.getTestConfigService()));
}
- public static class TestEmployee implements PdxSerializable {
- private String name;
- private int age;
-
- TestEmployee(String name, int age) {
- this.name = name;
- this.age = age;
- }
-
- @Override
- public void toData(PdxWriter writer) {
- writer.writeString("name", this.name);
- writer.writeInt("age", this.age);
- }
-
- @Override
- public void fromData(PdxReader reader) {
- this.name = reader.readString("name");
- this.age = reader.readInt("age");
- }
- }
-
-
}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
new file mode 100644
index 0000000..7a867fb
--- /dev/null
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcAsyncWriterTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
+ * applicable law or agreed to in writing, software distributed under the
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
+ * See the License for the specific language governing permissions and
limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+
+public class JdbcAsyncWriterTest {
+
+ private SqlHandler sqlHandler;
+ private JdbcAsyncWriter writer;
+
+ @Before
+ public void setup() {
+ sqlHandler = mock(SqlHandler.class);
+ writer = new JdbcAsyncWriter(sqlHandler);
+ }
+
+ @Test
+ public void throwsNullPointerExceptionIfGivenNullList() {
+ assertThatThrownBy(() ->
writer.processEvents(null)).isInstanceOf(NullPointerException.class);
+ }
+
+ @Test
+ public void doesNothingIfEventListIsEmpty() {
+ writer.processEvents(Collections.emptyList());
+
+ verifyZeroInteractions(sqlHandler);
+ assertThat(writer.getSuccessfulEvents()).isZero();
+ assertThat(writer.getTotalEvents()).isZero();
+ }
+
+ @Test
+ public void writesAProvidedEvent() {
+ writer.processEvents(Collections.singletonList(createMockEvent()));
+
+ verify(sqlHandler, times(1)).write(any(), any(), any(), any());
+ assertThat(writer.getSuccessfulEvents()).isEqualTo(1);
+ assertThat(writer.getTotalEvents()).isEqualTo(1);
+ }
+
+ @Test
+ public void writesMultipleProvidedEvents() {
+ List<AsyncEvent> events = new ArrayList<>();
+ events.add(createMockEvent());
+ events.add(createMockEvent());
+ events.add(createMockEvent());
+
+ writer.processEvents(events);
+
+ verify(sqlHandler, times(3)).write(any(), any(), any(), any());
+ assertThat(writer.getSuccessfulEvents()).isEqualTo(3);
+ assertThat(writer.getTotalEvents()).isEqualTo(3);
+ }
+
+ private AsyncEvent createMockEvent() {
+ AsyncEvent event = mock(AsyncEvent.class);
+ when(event.getRegion()).thenReturn(mock(InternalRegion.class));
+ return event;
+ }
+}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
index 06e56d2..7f9ddd6 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderIntegrationTest.java
@@ -32,26 +32,28 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
import org.apache.geode.connectors.jdbc.internal.TestConfigService;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.test.junit.categories.IntegrationTest;
@Category(IntegrationTest.class)
public class JdbcLoaderIntegrationTest {
- private JdbcLoader<String, String> jdbcLoader;
+
+ private static final String DB_NAME = "DerbyDB";
+ private static final String REGION_TABLE_NAME = "employees";
+ private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME
+ ";create=true";
+
private Cache cache;
private Connection connection;
private Statement statement;
- private String dbName = "DerbyDB";
- private String regionTableName = "employees";
- private String connectionURL = "jdbc:derby:memory:" + dbName +
";create=true";
@Before
public void setup() throws Exception {
cache = new CacheFactory().setPdxReadSerialized(false).create();
- connection = DriverManager.getConnection(connectionURL);
+ connection = DriverManager.getConnection(CONNECTION_URL);
statement = connection.createStatement();
- statement.execute("Create Table " + regionTableName
+ statement.execute("Create Table " + REGION_TABLE_NAME
+ " (id varchar(10) primary key not null, name varchar(10), age int)");
}
@@ -65,7 +67,7 @@ public class JdbcLoaderIntegrationTest {
if (statement == null) {
statement = connection.createStatement();
}
- statement.execute("Drop table " + regionTableName);
+ statement.execute("Drop table " + REGION_TABLE_NAME);
statement.close();
if (connection != null) {
@@ -73,32 +75,31 @@ public class JdbcLoaderIntegrationTest {
}
}
- private Region createRegionWithJDBCLoader(String regionName) {
- this.jdbcLoader = new JdbcLoader<>(createManager());
- RegionFactory<String, String> rf =
cache.createRegionFactory(RegionShortcut.REPLICATE);
- rf.setCacheLoader(jdbcLoader);
- return rf.create(regionName);
- }
-
@Test
public void verifySimpleGet() throws SQLException {
- statement.execute("Insert into " + regionTableName + " values('1', 'Emp1',
21)");
- Region region = createRegionWithJDBCLoader(this.regionTableName);
- Object result = region.get("1");
- assertThat(result).isNotNull();
- PdxInstance pdx = (PdxInstance) result;
+ statement.execute("Insert into " + REGION_TABLE_NAME + " values('1',
'Emp1', 21)");
+ Region<String, PdxInstance> region =
createRegionWithJDBCLoader(REGION_TABLE_NAME);
+ PdxInstance pdx = region.get("1");
+
assertThat(pdx.getField("name")).isEqualTo("Emp1");
assertThat(pdx.getField("age")).isEqualTo(21);
}
@Test
public void verifySimpleMiss() throws SQLException {
- Region region = createRegionWithJDBCLoader(this.regionTableName);
- Object result = region.get("1");
- assertThat(result).isNull();
+ Region<String, PdxInstance> region =
createRegionWithJDBCLoader(REGION_TABLE_NAME);
+ PdxInstance pdx = region.get("1");
+ assertThat(pdx).isNull();
+ }
+
+ private SqlHandler createSqlHandler() {
+ return new SqlHandler(new
ConnectionManager(TestConfigService.getTestConfigService()));
}
- private ConnectionManager createManager() {
- return new ConnectionManager(TestConfigService.getTestConfigService());
+ private Region<String, PdxInstance> createRegionWithJDBCLoader(String
regionName) {
+ JdbcLoader<String, PdxInstance> jdbcLoader = new
JdbcLoader<>(createSqlHandler());
+ RegionFactory<String, PdxInstance> rf =
cache.createRegionFactory(RegionShortcut.REPLICATE);
+ rf.setCacheLoader(jdbcLoader);
+ return rf.create(regionName);
}
}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
new file mode 100644
index 0000000..9817c37
--- /dev/null
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcLoaderTest.java
@@ -0,0 +1,40 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
+ * applicable law or agreed to in writing, software distributed under the
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
+ * See the License for the specific language governing permissions and
limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+
+import org.apache.geode.cache.LoaderHelper;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+
+public class JdbcLoaderTest {
+
+ @Test
+ public void loadReadsFromSqlHandler() {
+ SqlHandler sqlHandler = mock(SqlHandler.class);
+ JdbcLoader<Object, Object> loader = new JdbcLoader<>(sqlHandler);
+ LoaderHelper loaderHelper = mock(LoaderHelper.class);
+ when(loaderHelper.getRegion()).thenReturn(mock(InternalRegion.class));
+ loader.load(loaderHelper);
+ verify(sqlHandler, times(1)).read(any(), any());
+ }
+
+}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
index 248fcf5..54f6329 100644
---
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterIntegrationTest.java
@@ -26,7 +26,9 @@ import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -38,6 +40,7 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.connectors.jdbc.internal.ConnectionManager;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
import org.apache.geode.connectors.jdbc.internal.TestConfigService;
import org.apache.geode.pdx.PdxInstance;
import org.apache.geode.pdx.PdxReader;
@@ -49,27 +52,34 @@ import
org.apache.geode.test.junit.categories.IntegrationTest;
@Category(IntegrationTest.class)
public class JdbcSynchronousWriterIntegrationTest {
- private Cache cache;
+ private static final String DB_NAME = "DerbyDB";
+ private static final String REGION_TABLE_NAME = "employees";
+ private static final String CONNECTION_URL = "jdbc:derby:memory:" + DB_NAME
+ ";create=true";
+ private Cache cache;
+ private Region<String, PdxInstance> employees;
private Connection connection;
-
private Statement statement;
-
private JdbcSynchronousWriter jdbcWriter;
-
- private String dbName = "DerbyDB";
-
- private String regionTableName = "employees";
-
- private String connectionURL = "jdbc:derby:memory:" + dbName +
";create=true";
+ private PdxInstance pdx1;
+ private PdxInstance pdx2;
+ private Employee employee1;
+ private Employee employee2;
@Before
public void setup() throws Exception {
cache = new CacheFactory().setPdxReadSerialized(false).create();
- connection = DriverManager.getConnection(connectionURL);
+ employees = createRegionWithJDBCSynchronousWriter(REGION_TABLE_NAME);
+ connection = DriverManager.getConnection(CONNECTION_URL);
statement = connection.createStatement();
- statement.execute("Create Table " + regionTableName
+ statement.execute("Create Table " + REGION_TABLE_NAME
+ " (id varchar(10) primary key not null, name varchar(10), age int)");
+ pdx1 =
cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name",
"Emp1")
+ .writeInt("age", 55).create();
+ pdx2 =
cache.createPdxInstanceFactory(Employee.class.getName()).writeString("name",
"Emp2")
+ .writeInt("age", 21).create();
+ employee1 = (Employee) pdx1.getObject();
+ employee2 = (Employee) pdx2.getObject();
}
@After
@@ -82,7 +92,7 @@ public class JdbcSynchronousWriterIntegrationTest {
if (statement == null) {
statement = connection.createStatement();
}
- statement.execute("Drop table " + regionTableName);
+ statement.execute("Drop table " + REGION_TABLE_NAME);
statement.close();
if (connection != null) {
@@ -90,241 +100,141 @@ public class JdbcSynchronousWriterIntegrationTest {
}
}
- private Properties getRequiredProperties() {
- Properties props = new Properties();
- props.setProperty("url", this.connectionURL);
- return props;
- }
-
@Test
public void canInsertIntoTable() throws Exception {
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- PdxInstance pdx2 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
- .writeInt("age", 21).create();
employees.put("1", pdx1);
employees.put("2", pdx2);
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(21);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee1);
+ assertRecordMatchesEmployee(resultSet, "2", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canPutAllInsertIntoTable() throws Exception {
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- PdxInstance pdx2 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
- .writeInt("age", 21).create();
Map<String, PdxInstance> putAllMap = new HashMap<>();
putAllMap.put("1", pdx1);
putAllMap.put("2", pdx2);
employees.putAll(putAllMap);
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(21);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee1);
+ assertRecordMatchesEmployee(resultSet, "2", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void verifyThatPdxFieldNamedSameAsPrimaryKeyIsIgnored() throws
Exception {
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).writeInt("id", 3).create();
- employees.put("1", pdx1);
+ PdxInstance pdxInstanceWithId =
cache.createPdxInstanceFactory(Employee.class.getName())
+ .writeString("name", "Emp1").writeInt("age", 55).writeInt("id",
3).create();
+ employees.put("1", pdxInstanceWithId);
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", (Employee)
pdxInstanceWithId.getObject());
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void putNonPdxInstanceFails() {
- Region employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- catchException(employees).put("1", "non pdx instance");
- assertThat((Exception)
caughtException()).isInstanceOf(ClassCastException.class);
- assertThat(caughtException().getMessage())
- .isEqualTo("java.lang.String cannot be cast to
org.apache.geode.pdx.PdxInstance");
+ Region nonPdxEmployees = this.employees;
+ catchException(nonPdxEmployees).put("1", "non pdx instance");
+ assertThat((Exception)
caughtException()).isInstanceOf(IllegalArgumentException.class);
}
@Test
public void putNonPdxInstanceThatIsPdxSerializable() throws SQLException {
- Region employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- Object value = new TestEmployee("Emp2", 22);
- employees.put("2", value);
-
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(22);
- assertThat(rs.next()).isFalse();
- }
+ Region nonPdxEmployees = this.employees;
+ Employee value = new Employee("Emp2", 22);
+ nonPdxEmployees.put("2", value);
- public static class TestEmployee implements PdxSerializable {
- private String name;
- private int age;
-
- TestEmployee(String name, int age) {
- this.name = name;
- this.age = age;
- }
-
- @Override
- public void toData(PdxWriter writer) {
- writer.writeString("name", this.name);
- writer.writeInt("age", this.age);
- }
-
- @Override
- public void fromData(PdxReader reader) {
- this.name = reader.readString("name");
- this.age = reader.readInt("age");
- }
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "2", value);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canDestroyFromTable() throws Exception {
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
- PdxInstance pdx2 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp2")
- .writeInt("age", 21).create();
employees.put("1", pdx1);
employees.put("2", pdx2);
- try {
- employees.destroy("1");
- } catch (PdxSerializationException ignore) {
- // destroy tries to deserialize old value
- // which does not work because our PdxInstance
- // does not have a real class
- }
+ employees.destroy("1");
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("2");
- assertThat(rs.getString("name")).isEqualTo("Emp2");
- assertThat(rs.getObject("age")).isEqualTo(21);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "2", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canUpdateTable() throws Exception {
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
employees.put("1", pdx1);
+ employees.put("1", pdx2);
- PdxInstance pdx3 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 72).create();
- try {
- employees.put("1", pdx3);
- } catch (PdxSerializationException ignore) {
- // put tries to deserialize old value
- // which does not work because our PdxInstance
- // does not have a real class
- }
-
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getObject("age")).isEqualTo(72);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canUpdateBecomeInsert() throws Exception {
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
employees.put("1", pdx1);
- statement.execute("delete from " + regionTableName + " where id = '1'");
+ statement.execute("delete from " + REGION_TABLE_NAME + " where id = '1'");
validateTableRowCount(0);
- PdxInstance pdx3 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 72).create();
- try {
- employees.put("1", pdx3);
- } catch (PdxSerializationException ignore) {
- // put tries to deserialize old value
- // which does not work because our PdxInstance
- // does not have a real class
- }
+ employees.put("1", pdx2);
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(72);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee2);
+ assertThat(resultSet.next()).isFalse();
}
@Test
public void canInsertBecomeUpdate() throws Exception {
- statement.execute("Insert into " + regionTableName + " values('1',
'bogus', 11)");
+ statement.execute("Insert into " + REGION_TABLE_NAME + " values('1',
'bogus', 11)");
validateTableRowCount(1);
- Region<String, PdxInstance> employees =
- createRegionWithJDBCSynchronousWriter(regionTableName,
getRequiredProperties());
- PdxInstance pdx1 =
cache.createPdxInstanceFactory("Employee").writeString("name", "Emp1")
- .writeInt("age", 55).create();
employees.put("1", pdx1);
- ResultSet rs = statement.executeQuery("select * from " + regionTableName +
" order by id asc");
- assertThat(rs.next()).isTrue();
- assertThat(rs.getString("id")).isEqualTo("1");
- assertThat(rs.getString("name")).isEqualTo("Emp1");
- assertThat(rs.getObject("age")).isEqualTo(55);
- assertThat(rs.next()).isFalse();
+ ResultSet resultSet =
+ statement.executeQuery("select * from " + REGION_TABLE_NAME + " order
by id asc");
+ assertRecordMatchesEmployee(resultSet, "1", employee1);
+ assertThat(resultSet.next()).isFalse();
}
- private Region<String, PdxInstance>
createRegionWithJDBCSynchronousWriter(String regionName,
- Properties props) {
- jdbcWriter = new JdbcSynchronousWriter(createManager());
- jdbcWriter.init(props);
+ private Region<String, PdxInstance>
createRegionWithJDBCSynchronousWriter(String regionName) {
+ jdbcWriter = new JdbcSynchronousWriter(createSqlHandler());
+ jdbcWriter.init(new Properties());
- RegionFactory<String, PdxInstance> rf =
cache.createRegionFactory(RegionShortcut.REPLICATE);
- rf.setCacheWriter(jdbcWriter);
- return rf.create(regionName);
+ RegionFactory<String, PdxInstance> regionFactory =
+ cache.createRegionFactory(RegionShortcut.REPLICATE);
+ regionFactory.setCacheWriter(jdbcWriter);
+ return regionFactory.create(regionName);
}
private void validateTableRowCount(int expected) throws Exception {
- ResultSet rs = statement.executeQuery("select count(*) from " +
regionTableName);
- rs.next();
- int size = rs.getInt(1);
+ ResultSet resultSet = statement.executeQuery("select count(*) from " +
REGION_TABLE_NAME);
+ resultSet.next();
+ int size = resultSet.getInt(1);
assertThat(size).isEqualTo(expected);
}
- private ConnectionManager createManager() {
- return new ConnectionManager(TestConfigService.getTestConfigService());
+ private SqlHandler createSqlHandler() {
+ return new SqlHandler(new
ConnectionManager(TestConfigService.getTestConfigService()));
}
+ private void assertRecordMatchesEmployee(ResultSet resultSet, String key,
Employee employee)
+ throws SQLException {
+ assertThat(resultSet.next()).isTrue();
+ assertThat(resultSet.getString("id")).isEqualTo(key);
+ assertThat(resultSet.getString("name")).isEqualTo(employee.getName());
+ assertThat(resultSet.getObject("age")).isEqualTo(employee.getAge());
+ }
}
diff --git
a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterTest.java
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterTest.java
new file mode 100644
index 0000000..9a3fcfc
--- /dev/null
+++
b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JdbcSynchronousWriterTest.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license *
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding *
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance with the
License. You may obtain a *
+ * copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * *
Unless required by
+ * applicable law or agreed to in writing, software distributed under the
License * is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express * or implied.
+ * See the License for the specific language governing permissions and
limitations under * the
+ * License.
+ *
+ */
+package org.apache.geode.connectors.jdbc;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.SerializedCacheValue;
+import org.apache.geode.connectors.jdbc.internal.SqlHandler;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.pdx.PdxInstance;
+
+public class JdbcSynchronousWriterTest {
+
+ private EntryEvent<Object, Object> entryEvent;
+ private PdxInstance pdxInstance;
+ private SqlHandler sqlHandler;
+
+ @Before
+ public void setUp() {
+ entryEvent = mock(EntryEvent.class);
+ pdxInstance = mock(PdxInstance.class);
+ SerializedCacheValue<Object> serializedNewValue =
mock(SerializedCacheValue.class);
+ sqlHandler = mock(SqlHandler.class);
+
+ when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+ when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+ when(serializedNewValue.getDeserializedValue()).thenReturn(pdxInstance);
+
+ }
+
+ @Test
+ public void beforeUpdateWithPdxInstanceWritesToSqlHandler() {
+ JdbcSynchronousWriter<Object, Object> writer = new
JdbcSynchronousWriter<>(sqlHandler);
+
+ writer.beforeUpdate(entryEvent);
+
+ verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+ }
+
+ @Test
+ public void beforeUpdateWithoutPdxInstanceWritesToSqlHandler() {
+ EntryEvent<Object, Object> entryEvent = mock(EntryEvent.class);
+ Object value = new Object();
+ SerializedCacheValue<Object> serializedNewValue =
mock(SerializedCacheValue.class);
+ SqlHandler sqlHander = mock(SqlHandler.class);
+
+ when(entryEvent.getRegion()).thenReturn(mock(InternalRegion.class));
+ when(entryEvent.getSerializedNewValue()).thenReturn(serializedNewValue);
+ when(serializedNewValue.getDeserializedValue()).thenReturn(value);
+
+ JdbcSynchronousWriter<Object, Object> writer = new
JdbcSynchronousWriter<>(sqlHander);
+
+ assertThatThrownBy(() -> writer.beforeUpdate(entryEvent))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ public void beforeCreateWithPdxInstanceWritesToSqlHandler() {
+ JdbcSynchronousWriter<Object, Object> writer = new
JdbcSynchronousWriter<>(sqlHandler);
+
+ writer.beforeCreate(entryEvent);
+
+ verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+ }
+
+ @Test
+ public void beforeDestroyWithPdxInstanceWritesToSqlHandler() {
+ JdbcSynchronousWriter<Object, Object> writer = new
JdbcSynchronousWriter<>(sqlHandler);
+
+ writer.beforeDestroy(entryEvent);
+
+ verify(sqlHandler, times(1)).write(any(), any(), any(), eq(pdxInstance));
+ }
+
+ @Test
+ public void beforeRegionDestroyDoesNotWriteToSqlHandler() {
+ JdbcSynchronousWriter<Object, Object> writer = new
JdbcSynchronousWriter<>(sqlHandler);
+
+ writer.beforeRegionDestroy(mock(RegionEvent.class));
+
+ verifyZeroInteractions(sqlHandler);
+ }
+
+ @Test
+ public void beforeRegionClearDoesNotWriteToSqlHandler() {
+ JdbcSynchronousWriter<Object, Object> writer = new
JdbcSynchronousWriter<>(sqlHandler);
+
+ writer.beforeRegionClear(mock(RegionEvent.class));
+
+ verifyZeroInteractions(sqlHandler);
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].