This is an automated email from the ASF dual-hosted git repository. agingade pushed a commit to branch feature/GEODE-3781 in repository https://gitbox.apache.org/repos/asf/geode.git
commit f312f305077fe29887ee8c1e7ccd5a4d511fab15 Author: Anil <[email protected]> AuthorDate: Mon Oct 23 12:01:58 2017 -0700 Initial project setup for jdbc-connector. Added basic source and test implementation for JDBCAsyncWriter. --- geode-assembly/build.gradle | 1 + geode-connectors/build.gradle | 32 +++++ .../geode/connectors/jdbc/JDBCAsyncWriter.java | 58 +++++++++ .../apache/geode/connectors/jdbc/JDBCHelper.java | 5 + .../jdbc/JDBCAsyncWriterIntegrationTest.java | 144 +++++++++++++++++++++ settings.gradle | 1 + 6 files changed, 241 insertions(+) diff --git a/geode-assembly/build.gradle b/geode-assembly/build.gradle index 48a5cfa..ac0ac7e 100755 --- a/geode-assembly/build.gradle +++ b/geode-assembly/build.gradle @@ -58,6 +58,7 @@ dependencies { archives project(':geode-common') archives project(':geode-json') archives project(':geode-core') + archives project(':geode-connectors') archives project(':geode-lucene') archives project(':geode-old-client-support') archives project(':geode-protobuf') diff --git a/geode-connectors/build.gradle b/geode-connectors/build.gradle new file mode 100644 index 0000000..b11f352 --- /dev/null +++ b/geode-connectors/build.gradle @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + compile project(':geode-core') + compile project(':geode-common') + + testCompile project(':geode-junit') + + //Connectors test framework. + testRuntime 'org.apache.derby:derby:' + project.'derby.version' + testCompile 'com.pholser:junit-quickcheck-core:' + project.'junit-quickcheck.version' + testCompile 'com.pholser:junit-quickcheck-generators:' + project.'junit-quickcheck.version' + testCompile files(project(':geode-core').sourceSets.test.output) + testCompile project(':geode-old-versions') +} + +integrationTest.forkEvery 0 diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java new file mode 100644 index 0000000..cc0b30d --- /dev/null +++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.connectors.jdbc; + +import java.util.List; +import java.util.Properties; + +import org.apache.geode.cache.asyncqueue.AsyncEvent; +import org.apache.geode.cache.asyncqueue.AsyncEventListener; + +/* + * This class provides write behind cache semantics for a JDBC data source using AsyncEventListener. + * + * @since Geode 1.4 + */ +public class JDBCAsyncWriter implements AsyncEventListener { + + private long totalEvents = 0; + + private long successfulEvents = 0; + + @Override + public void close() { + // TODO Auto-generated method stub + + } + + @Override + public boolean processEvents(List<AsyncEvent> events) { + totalEvents += events.size(); + return true; + } + + @Override + public void init(Properties props) { + + }; + + public long getTotalEvents() { + return this.totalEvents; + } + + public long getsuccessfulEvents() { + return this.successfulEvents; + } +} diff --git a/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java new file mode 100644 index 0000000..edad4c4 --- /dev/null +++ b/geode-connectors/src/main/java/org/apache/geode/connectors/jdbc/JDBCHelper.java @@ -0,0 +1,5 @@ +package org.apache.geode.connectors.jdbc; + +public class JDBCHelper { + +} diff --git a/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java new file mode 100644 index 0000000..d135dfc --- /dev/null +++ b/geode-connectors/src/test/java/org/apache/geode/connectors/jdbc/JDBCAsyncWriterIntegrationTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.connectors.jdbc; + +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.assertj.core.api.Assertions.*; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.concurrent.TimeUnit; + +import org.junit.*; +import org.junit.experimental.categories.Category; +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.internal.cache.GemFireCacheImpl; +import org.apache.geode.test.junit.categories.IntegrationTest; +import org.awaitility.Awaitility; + +@Category(IntegrationTest.class) +public class JDBCAsyncWriterIntegrationTest { + + private Cache cache; + + private Connection conn; + + private Statement stmt; + + JDBCAsyncWriter jdbcWriter; + + private String dbName="DerbyDB"; + + private String regionTableName = "employees"; + + @Before + public void setup() throws Exception { + try { + cache = CacheFactory.getAnyInstance(); + } catch (Exception e) { + // ignore + } + if (null == cache) { + cache = (GemFireCacheImpl) new CacheFactory().set(MCAST_PORT, "0").create(); + } + setupDB(); + } + + @After + public void tearDown() throws Exception { + if (cache != null && !cache.isClosed()) { + cache.close(); + cache = null; + } + closeDB(); + } + + public void setupDB() throws Exception { + String driver = "org.apache.derby.jdbc.EmbeddedDriver"; + String connectionURL = "jdbc:derby:memory:" + dbName + ";create=true"; + Class.forName(driver); + conn = DriverManager.getConnection(connectionURL); + stmt = conn.createStatement(); + stmt.execute("Create Table " + regionTableName + " (id varchar(10), name varchar(10))"); + } + + public void closeDB() throws Exception { + if (stmt == null) { + stmt = conn.createStatement(); + } + stmt.execute("Drop table " + regionTableName); + stmt.close(); + + if (conn != null) { + conn.close(); + } + } + + @Test + public void canInstallJDBCAsyncWriterOnRegion() { + Region employees = createRegionWithJDBCAsyncWriter("employees"); + employees.put("1", "Emp1"); + try {Thread.sleep(100);} catch (Exception ex){} + employees.put("2", "Emp2"); + + Awaitility.await().atMost(30, TimeUnit.SECONDS) + .until(() -> assertThat(jdbcWriter.getTotalEvents()).isEqualTo(2)); + + } + + @Test + public void jdbcAsyncWriterCanWriteToDatabase() throws Exception { + Region employees = createRegionWithJDBCAsyncWriter("employees"); + + employees.put("1", "Emp1"); + employees.put("2", "Emp2"); + + validateTableRowCount(2); + } + + private Region createRegionWithJDBCAsyncWriter(String regionName) { + jdbcWriter = new JDBCAsyncWriter(); + cache.createAsyncEventQueueFactory().setBatchSize(1) + .setBatchTimeInterval(1).create("jdbcAsyncQueue", jdbcWriter); + + RegionFactory rf = cache.createRegionFactory(RegionShortcut.REPLICATE); + rf.addAsyncEventQueueId("jdbcAsyncQueue"); + return rf.create(regionName); + } + + private void validateTableRowCount(int expected) throws Exception { + Awaitility.await().atMost(1, TimeUnit.SECONDS).until(() -> { + int size = 0; + try { + ResultSet rs = stmt.executeQuery("select count(*) from " + regionTableName); + while (rs.next()) { + size = rs.getInt(1); + } + } catch (Exception ex) { + // Need to fix this. + System.out.println("Exception while getting the table row count"); + } + assertThat(size).isEqualTo(expected); + }); + } + +} diff --git a/settings.gradle b/settings.gradle index 87f4a9c..9cbad6d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -40,6 +40,7 @@ include 'extensions/geode-modules-assembly' include 'geode-protobuf' include 'extensions/session-testing-war' include 'geode-concurrency-test' +include 'geode-connectors' if (GradleVersion.current() < GradleVersion.version(minimumGradleVersion)) { throw new GradleException('Running with unsupported Gradle Version. Use Gradle Wrapper or with Gradle version >= ' + minimumGradleVersion) -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
