This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d8e2c6  [GOBBLIN-768] Add MySQL implementation of SpecStore
8d8e2c6 is described below

commit 8d8e2c67117d74aaa5d5af13ae5c39604b9ac459
Author: Jack Moseley <[email protected]>
AuthorDate: Mon May 20 11:49:50 2019 -0700

    [GOBBLIN-768] Add MySQL implementation of SpecStore
    
    Closes #2631 from jack-moseley/mysql-spec-store
---
 .../gobblin/runtime/spec_store/MysqlSpecStore.java | 233 +++++++++++++++++++++
 .../runtime/spec_store/MysqlSpecStoreTest.java     | 145 +++++++++++++
 2 files changed, 378 insertions(+)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
new file mode 100644
index 0000000..6206a3f
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Blob;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.lang3.SerializationException;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Implementation of {@link SpecStore} that stores specs as serialized java 
objects in MySQL. Note that versions are not
+ * supported, so the version parameter will be ignored in methods that have it.
+ */
+@Slf4j
+public class MysqlSpecStore implements SpecStore {
+  public static final String CONFIG_PREFIX = "mysqlSpecStore";
+  public static final String SPEC_STORE_SOURCE = "source";
+  public static final String DEFAULT_SPEC_STORE_SOURCE = "default_source";
+
+  private static final String CREATE_TABLE_STATEMENT =
+      "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(128) NOT NULL, 
spec_source VARCHAR(128) NOT NULL, spec LONGBLOB, PRIMARY KEY (spec_uri, 
spec_source))";
+  private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM 
%s WHERE spec_uri = ?)";
+  private static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, 
spec_source, spec) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = 
VALUES(spec)";
+  private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE 
spec_uri = ?";
+  private static final String GET_STATEMENT = "SELECT spec FROM %s WHERE 
spec_uri = ?";
+  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM 
%s";
+
+  private final DataSource dataSource;
+  private final String tableName;
+  private final URI specStoreURI;
+  private final SpecSerDe specSerDe;
+  private final String specStoreSource;
+
+  public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException 
{
+    if (config.hasPath(CONFIG_PREFIX)) {
+      config = config.getConfig(CONFIG_PREFIX).withFallback(config);
+    }
+
+    this.dataSource = MysqlDataSourceFactory.get(config, 
SharedResourcesBrokerFactory.getImplicitBroker());
+    this.tableName = 
config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
+    this.specStoreURI = 
URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
+    this.specSerDe = specSerDe;
+    this.specStoreSource = ConfigUtils.getString(config, SPEC_STORE_SOURCE, 
DEFAULT_SPEC_STORE_SOURCE);
+
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(CREATE_TABLE_STATEMENT, 
this.tableName))) {
+      statement.executeUpdate();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public boolean exists(URI specUri) throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(EXISTS_STATEMENT, this.tableName))) {
+      statement.setString(1, specUri.toString());
+      try (ResultSet rs = statement.executeQuery()) {
+        rs.next();
+        return rs.getBoolean(1);
+      }
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void addSpec(Spec spec) throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
+
+      statement.setString(1, spec.getUri().toString());
+      statement.setString(2, this.specStoreSource);
+      statement.setBlob(3, new 
ByteArrayInputStream(this.specSerDe.serialize(spec)));
+      statement.executeUpdate();
+
+      connection.commit();
+    } catch (SQLException | SerializationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public boolean deleteSpec(Spec spec) throws IOException {
+    return deleteSpec(spec.getUri());
+  }
+
+  @Override
+  public boolean deleteSpec(URI specUri) throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(DELETE_STATEMENT, this.tableName))) {
+      statement.setString(1, specUri.toString());
+      int result = statement.executeUpdate();
+      connection.commit();
+      return result != 0;
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public boolean deleteSpec(URI specUri, String version) throws IOException {
+    return deleteSpec(specUri);
+  }
+
+  @Override
+  public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
+    addSpec(spec);
+    return spec;
+  }
+
+  @Override
+  public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
+
+      statement.setString(1, specUri.toString());
+
+      try (ResultSet rs = statement.executeQuery()) {
+        if (!rs.next()) {
+          throw new SpecNotFoundException(specUri);
+        }
+
+        Blob blob = rs.getBlob(1);
+        return 
this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream()));
+      }
+    } catch (SQLException | SerializationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Spec getSpec(URI specUri, String version) throws IOException, 
SpecNotFoundException {
+    return getSpec(specUri);
+  }
+
+  @Override
+  public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws 
IOException, SpecNotFoundException {
+    return Lists.newArrayList(getSpec(specUri));
+  }
+
+  @Override
+  public Collection<Spec> getSpecs() throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+
+      List<Spec> specs = new ArrayList<>();
+
+      try (ResultSet rs = statement.executeQuery()) {
+        while (rs.next()) {
+          try {
+            Blob blob = rs.getBlob(2);
+            
specs.add(this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream())));
+          } catch (SQLException | SerializationException e) {
+            log.error("Failed to deserialize spec", e);
+          }
+        }
+      }
+
+      return specs;
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Iterator<URI> getSpecURIs() throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = 
connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
+
+      List<URI> specs = new ArrayList<>();
+
+      try (ResultSet rs = statement.executeQuery()) {
+        while (rs.next()) {
+          URI specURI = URI.create(rs.getString(1));
+          specs.add(specURI);
+        }
+      }
+
+      return specs.iterator();
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Optional<URI> getSpecStoreURI() {
+    return Optional.of(this.specStoreURI);
+  }
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
new file mode 100644
index 0000000..4d3c89d
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.SerializationUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+
+
+public class MysqlSpecStoreTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "spec_store";
+
+  private MysqlSpecStore specStore;
+  private URI uri1 = URI.create("flowspec1");
+  private URI uri2 = URI.create("flowspec2");
+  private URI uri3 = URI.create("flowspec3");
+  private FlowSpec flowSpec1 = FlowSpec.builder(this.uri1)
+      .withConfig(ConfigBuilder.create().addPrimitive("key", "value").build())
+      .withDescription("Test flow spec")
+      .withVersion("Test version")
+      .build();
+  private FlowSpec flowSpec2 = FlowSpec.builder(this.uri2)
+      .withConfig(ConfigBuilder.create().addPrimitive("key2", 
"value2").build())
+      .withDescription("Test flow spec 2")
+      .withVersion("Test version 2")
+      .build();
+  private FlowSpec flowSpec3 = FlowSpec.builder(this.uri3)
+      .withConfig(ConfigBuilder.create().addPrimitive("key3", 
"value3").build())
+      .withDescription("Test flow spec 3")
+      .withVersion("Test version 3")
+      .build();
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, 
testDb.getJdbcUrl())
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
+  }
+
+  @Test
+  public void testAddSpec() throws Exception {
+    this.specStore.addSpec(this.flowSpec1);
+    this.specStore.addSpec(this.flowSpec2);
+
+    Assert.assertTrue(this.specStore.exists(this.uri1));
+    Assert.assertTrue(this.specStore.exists(this.uri2));
+    Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+  }
+
+  @Test
+  public void testGetSpec() throws Exception {
+    this.specStore.addSpec(this.flowSpec1);
+    this.specStore.addSpec(this.flowSpec2);
+
+    FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+    Assert.assertEquals(result, this.flowSpec1);
+
+    Collection<Spec> specs = this.specStore.getSpecs();
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    Iterator<URI> uris = this.specStore.getSpecURIs();
+    Assert.assertTrue(Iterators.contains(uris, this.uri1));
+    Assert.assertTrue(Iterators.contains(uris, this.uri2));
+  }
+
+  @Test
+  public void testGetCorruptedSpec() throws Exception {
+    this.specStore.addSpec(this.flowSpec1);
+    this.specStore.addSpec(this.flowSpec2);
+    this.specStore.addSpec(this.flowSpec3);
+
+    Collection<Spec> specs = this.specStore.getSpecs();
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+    Assert.assertFalse(specs.contains(this.flowSpec3));
+  }
+
+  @Test
+  public void testDeleteSpec() throws Exception {
+    this.specStore.addSpec(this.flowSpec1);
+    Assert.assertTrue(this.specStore.exists(this.uri1));
+
+    this.specStore.deleteSpec(this.uri1);
+    Assert.assertFalse(this.specStore.exists(this.uri1));
+  }
+
+  public class TestSpecSerDe implements SpecSerDe {
+    @Override
+    public byte[] serialize(Spec spec) {
+      byte[] bytes = SerializationUtils.serialize(spec);
+      // Reverse bytes to simulate corrupted Spec
+      if (spec.getUri().equals(uri3)) {
+        ArrayUtils.reverse(bytes);
+      }
+      return bytes;
+    }
+
+    @Override
+    public Spec deserialize(byte[] spec) {
+      return SerializationUtils.deserialize(spec);
+    }
+  }
+}
\ No newline at end of file

Reply via email to