This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 8d8e2c6 [GOBBLIN-768] Add MySQL implementation of SpecStore
8d8e2c6 is described below
commit 8d8e2c67117d74aaa5d5af13ae5c39604b9ac459
Author: Jack Moseley <[email protected]>
AuthorDate: Mon May 20 11:49:50 2019 -0700
[GOBBLIN-768] Add MySQL implementation of SpecStore
Closes #2631 from jack-moseley/mysql-spec-store
---
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 233 +++++++++++++++++++++
.../runtime/spec_store/MysqlSpecStoreTest.java | 145 +++++++++++++
2 files changed, 378 insertions(+)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
new file mode 100644
index 0000000..6206a3f
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.SerializationException;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Implementation of {@link SpecStore} that stores specs as serialized java
objects in MySQL. Note that versions are not
+ * supported, so the version parameter will be ignored in methods that have it.
+ */
+@Slf4j
+public class MysqlSpecStore implements SpecStore {
+ public static final String CONFIG_PREFIX = "mysqlSpecStore";
+ public static final String SPEC_STORE_SOURCE = "source";
+ public static final String DEFAULT_SPEC_STORE_SOURCE = "default_source";
+
+ private static final String CREATE_TABLE_STATEMENT =
+ "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL,
spec_source VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri,
spec_source))";
+ private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM
%s WHERE spec_uri = ?)";
+ private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri,
spec_source, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec =
VALUES(spec)";
+ private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE
spec_uri = ?";
+ private static final String GET_STATEMENT = "SELECT spec FROM %s WHERE
spec_uri = ?";
+ private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM
%s";
+
+ private final DataSource dataSource;
+ private final String tableName;
+ private final URI specStoreURI;
+ private final SpecSerDe specSerDe;
+ private final String specStoreSource;
+
+ public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException
{
+ if (config.hasPath(CONFIG_PREFIX)) {
+ config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+ }
+
+ this.dataSource = MysqlDataSourceFactory.get(config,
SharedResourcesBrokerFactory.getImplicitBroker());
+ this.tableName =
config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
+ this.specStoreURI =
URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
+ this.specSerDe = specSerDe;
+ this.specStoreSource = ConfigUtils.getString(config, SPEC_STORE_SOURCE,
DEFAULT_SPEC_STORE_SOURCE);
+
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT,
this.tableName))) {
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean exists(URI specUri) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(EXISTS_STATEMENT, this.tableName))) {
+ statement.setString(1, specUri.toString());
+ try (ResultSet rs = statement.executeQuery()) {
+ rs.next();
+ return rs.getBoolean(1);
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void addSpec(Spec spec) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
+
+ statement.setString(1, spec.getUri().toString());
+ statement.setString(2, this.specStoreSource);
+ statement.setBlob(3, new
ByteArrayInputStream(this.specSerDe.serialize(spec)));
+ statement.executeUpdate();
+
+ connection.commit();
+ } catch (SQLException | SerializationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean deleteSpec(Spec spec) throws IOException {
+ return deleteSpec(spec.getUri());
+ }
+
+ @Override
+ public boolean deleteSpec(URI specUri) throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(DELETE_STATEMENT, this.tableName))) {
+ statement.setString(1, specUri.toString());
+ int result = statement.executeUpdate();
+ connection.commit();
+ return result != 0;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public boolean deleteSpec(URI specUri, String version) throws IOException {
+ return deleteSpec(specUri);
+ }
+
+ @Override
+ public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+ addSpec(spec);
+ return spec;
+ }
+
+ @Override
+ public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
+
+ statement.setString(1, specUri.toString());
+
+ try (ResultSet rs = statement.executeQuery()) {
+ if (!rs.next()) {
+ throw new SpecNotFoundException(specUri);
+ }
+
+ Blob blob = rs.getBlob(1);
+ return
this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream()));
+ }
+ } catch (SQLException | SerializationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Spec getSpec(URI specUri, String version) throws IOException,
SpecNotFoundException {
+ return getSpec(specUri);
+ }
+
+ @Override
+ public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws
IOException, SpecNotFoundException {
+ return Lists.newArrayList(getSpec(specUri));
+ }
+
+ @Override
+ public Collection<Spec> getSpecs() throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+
+ List<Spec> specs = new ArrayList<>();
+
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ try {
+ Blob blob = rs.getBlob(2);
+
specs.add(this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream())));
+ } catch (SQLException | SerializationException e) {
+ log.error("Failed to deserialize spec", e);
+ }
+ }
+ }
+
+ return specs;
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Iterator<URI> getSpecURIs() throws IOException {
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement =
connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+
+ List<URI> specs = new ArrayList<>();
+
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ URI specURI = URI.create(rs.getString(1));
+ specs.add(specURI);
+ }
+ }
+
+ return specs.iterator();
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public Optional<URI> getSpecStoreURI() {
+ return Optional.of(this.specStoreURI);
+ }
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
new file mode 100644
index 0000000..4d3c89d
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+public class MysqlSpecStoreTest {
+ private static final String USER = "testUser";
+ private static final String PASSWORD = "testPassword";
+ private static final String TABLE = "spec_store";
+
+ private MysqlSpecStore specStore;
+ private URI uri1 = URI.create("flowspec1");
+ private URI uri2 = URI.create("flowspec2");
+ private URI uri3 = URI.create("flowspec3");
+ private FlowSpec flowSpec1 = FlowSpec.builder(this.uri1)
+ .withConfig(ConfigBuilder.create().addPrimitive("key", "value").build())
+ .withDescription("Test flow spec")
+ .withVersion("Test version")
+ .build();
+ private FlowSpec flowSpec2 = FlowSpec.builder(this.uri2)
+ .withConfig(ConfigBuilder.create().addPrimitive("key2",
"value2").build())
+ .withDescription("Test flow spec 2")
+ .withVersion("Test version 2")
+ .build();
+ private FlowSpec flowSpec3 = FlowSpec.builder(this.uri3)
+ .withConfig(ConfigBuilder.create().addPrimitive("key3",
"value3").build())
+ .withDescription("Test flow spec 3")
+ .withVersion("Test version 3")
+ .build();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+ Config config = ConfigBuilder.create()
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY,
testDb.getJdbcUrl())
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+ .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+ .build();
+
+ this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
+ }
+
+ @Test
+ public void testAddSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec1);
+ this.specStore.addSpec(this.flowSpec2);
+
+ Assert.assertTrue(this.specStore.exists(this.uri1));
+ Assert.assertTrue(this.specStore.exists(this.uri2));
+ Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+ }
+
+ @Test
+ public void testGetSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec1);
+ this.specStore.addSpec(this.flowSpec2);
+
+ FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+ Assert.assertEquals(result, this.flowSpec1);
+
+ Collection<Spec> specs = this.specStore.getSpecs();
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ Iterator<URI> uris = this.specStore.getSpecURIs();
+ Assert.assertTrue(Iterators.contains(uris, this.uri1));
+ Assert.assertTrue(Iterators.contains(uris, this.uri2));
+ }
+
+ @Test
+ public void testGetCorruptedSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec1);
+ this.specStore.addSpec(this.flowSpec2);
+ this.specStore.addSpec(this.flowSpec3);
+
+ Collection<Spec> specs = this.specStore.getSpecs();
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+ Assert.assertFalse(specs.contains(this.flowSpec3));
+ }
+
+ @Test
+ public void testDeleteSpec() throws Exception {
+ this.specStore.addSpec(this.flowSpec1);
+ Assert.assertTrue(this.specStore.exists(this.uri1));
+
+ this.specStore.deleteSpec(this.uri1);
+ Assert.assertFalse(this.specStore.exists(this.uri1));
+ }
+
+ public class TestSpecSerDe implements SpecSerDe {
+ @Override
+ public byte[] serialize(Spec spec) {
+ byte[] bytes = SerializationUtils.serialize(spec);
+ // Reverse bytes to simulate corrupted Spec
+ if (spec.getUri().equals(uri3)) {
+ ArrayUtils.reverse(bytes);
+ }
+ return bytes;
+ }
+
+ @Override
+ public Spec deserialize(byte[] spec) {
+ return SerializationUtils.deserialize(spec);
+ }
+ }
+}
\ No newline at end of file