This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2502a1f  [GOBBLIN-793] Separate SpecSerDe from SpecCatalogs and add 
GsonSpecSerDe
2502a1f is described below

commit 2502a1ff1da1798ddb3804873bea07fd8755e06f
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jun 13 10:21:20 2019 -0700

    [GOBBLIN-793] Separate SpecSerDe from SpecCatalogs and add GsonSpecSerDe
    
    Closes #2658 from jack-moseley/spec-serde
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../org/apache/gobblin/runtime/api/SpecSerDe.java  |  4 +-
 .../gobblin/runtime/api/SpecSerDeException.java    | 31 ++------
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 68 +++++++----------
 .../runtime/spec_catalog/TopologyCatalog.java      | 30 ++++----
 .../runtime/spec_serde/FlowSpecDeserializer.java   | 85 ++++++++++++++++++++++
 .../runtime/spec_serde/FlowSpecSerializer.java     | 81 +++++++++++++++++++++
 .../runtime/spec_serde/GsonFlowSpecSerDe.java      | 62 ++++++++++++++++
 .../gobblin/runtime/spec_serde}/GsonSerDe.java     |  2 +-
 .../gobblin/runtime/spec_serde/JavaSpecSerDe.java  | 49 +++++++++++++
 .../gobblin/runtime/spec_store/MysqlSpecStore.java |  9 +--
 .../spec_serde/FlowSpecSerializationTest.java      | 70 ++++++++++++++++++
 .../runtime/spec_store/MysqlSpecStoreTest.java     | 12 +--
 .../modules/orchestration/FSDagStateStore.java     |  2 +-
 .../modules/orchestration/MysqlDagStateStore.java  |  2 +-
 .../scheduler/GobblinServiceJobScheduler.java      | 40 +++++-----
 16 files changed, 431 insertions(+), 117 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 58e4255..8744c0d 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -894,6 +894,7 @@ public class ConfigurationKeys {
    * Configuration properties related to TopologySpec Store
    */
   public static final String TOPOLOGYSPEC_STORE_CLASS_KEY = 
"topologySpec.store.class";
+  public static final String TOPOLOGYSPEC_SERDE_CLASS_KEY = 
"topologySpec.serde.class";
   public static final String TOPOLOGYSPEC_STORE_DIR_KEY = 
"topologySpec.store.dir";
 
   /***
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
index 85f38cc..d712322 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
@@ -26,12 +26,12 @@ public interface SpecSerDe {
    * @param spec {@link Spec} to serialize.
    * @return byte array of serialized {@link Spec}.
    */
-  public byte[] serialize(Spec spec);
+  byte[] serialize(Spec spec) throws SpecSerDeException;
 
   /***
    * Deserialize byte array into a {@link Spec}.
    * @param spec byte array to deserialize.
    * @return deserialized {@link Spec}.
    */
-  public Spec deserialize(byte[] spec);
+  Spec deserialize(byte[] spec) throws SpecSerDeException;
 }
\ No newline at end of file
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
index 776ff6f..6097aa9 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
@@ -17,37 +17,20 @@
 
 package org.apache.gobblin.runtime.api;
 
-import java.net.URI;
-
-
 /**
  * An exception when {@link Spec} cannot be correctly serialized/deserialized 
from underlying storage.
  */
-public class SpecSerDeException extends Exception {
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The URI that triggered SerDe error.
-   * Could be a single Spec's URI or parent-level URI.
-   */
-  private final URI errorUri;
-
-  public SpecSerDeException(URI errorUri) {
-    super("Error occurred in loading of Spec with URI " + errorUri);
-    this.errorUri = errorUri;
-  }
+public class SpecSerDeException extends RuntimeException {
 
-  public SpecSerDeException(URI errorUri, Throwable cause) {
-    super("Error occurred in loading URI " + errorUri, cause);
-    this.errorUri = errorUri;
+  public SpecSerDeException(Spec spec, Throwable cause) {
+    super("Error occurred when loading Spec with URI " + spec.getUri(), cause);
   }
 
-  public SpecSerDeException(String errorMsg, URI errorUri, Throwable cause) {
-    super("Error occurred in loading URI " + errorUri + " with message:" + 
errorMsg, cause);
-    this.errorUri = errorUri;
+  public SpecSerDeException(Throwable cause) {
+    super("Error occurred when loading Spec", cause);
   }
 
-  public URI getErrorUri() {
-    return errorUri;
+  public SpecSerDeException(String message) {
+    super(message);
   }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 5076092..1124137 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.runtime.spec_catalog;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -31,13 +27,18 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import javax.annotation.Nonnull;
-import org.apache.commons.lang3.SerializationUtils;
+
 import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nonnull;
+
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -50,8 +51,10 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.callbacks.CallbackResult;
 import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
 
@@ -60,7 +63,7 @@ import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
  * A service that interact with FlowSpec storage.
  * The FlowSpec storage, a.k.a. {@link SpecStore} should be plugable with 
different implementation.
  */
-public class FlowCatalog extends AbstractIdleService implements SpecCatalog, 
MutableSpecCatalog, SpecSerDe {
+public class FlowCatalog extends AbstractIdleService implements SpecCatalog, 
MutableSpecCatalog {
 
   /***
    * Configuration properties related to FlowSpec Store
@@ -68,6 +71,8 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
   public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
   public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
   public static final String DEFAULT_FLOWSPEC_STORE_CLASS = 
FSSpecStore.class.getCanonicalName();
+  public static final String FLOWSPEC_SERDE_CLASS_KEY = "flowSpec.serde.class";
+  public static final String DEFAULT_FLOWSPEC_SERDE_CLASS = 
JavaSpecSerDe.class.getCanonicalName();
 
   protected final SpecCatalogListenersList listeners;
   protected final Logger log;
@@ -112,13 +117,15 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
         newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
             config.getValue(FLOWSPEC_STORE_DIR_KEY));
       }
-      String specStoreClassName = DEFAULT_FLOWSPEC_STORE_CLASS;
-      if (config.hasPath(FLOWSPEC_STORE_CLASS_KEY)) {
-        specStoreClassName = config.getString(FLOWSPEC_STORE_CLASS_KEY);
-      }
+      String specStoreClassName = ConfigUtils.getString(config, 
FLOWSPEC_STORE_CLASS_KEY, DEFAULT_FLOWSPEC_STORE_CLASS);
       this.log.info(String.format("Using class name/alias [%s] for specstore", 
specStoreClassName));
+      String specSerDeClassName = ConfigUtils.getString(config, 
FLOWSPEC_SERDE_CLASS_KEY, DEFAULT_FLOWSPEC_SERDE_CLASS);
+      this.log.info(String.format("Using class name/alias [%s] for spec 
serde", specSerDeClassName));
+
+      SpecSerDe specSerDe = (SpecSerDe) 
ConstructorUtils.invokeConstructor(Class.forName(
+          new 
ClassAliasResolver<>(SpecSerDe.class).resolve(specSerDeClassName)));
       this.specStore = (SpecStore) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
-          specStoreClassName)), newConfig, this);
+          specStoreClassName)), newConfig, specSerDe);
     } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
         | ClassNotFoundException e) {
       throw new RuntimeException(e);
@@ -149,8 +156,8 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
       while (uriIterator.hasNext()) {
         this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
       }
-    } catch (SpecSerDeException ssde) {
-      log.error("Cannot retrieve specs from catalog:", ssde);
+    } catch (IOException e) {
+      log.error("Cannot retrieve specs from catalog:", e);
     }
   }
 
@@ -167,8 +174,8 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
               new 
SpecCatalogListener.AddSpecCallback(getSpecWrapper(uriIterator.next()));
           this.listeners.callbackOneListener(addJobCallback, specListener);
         }
-      } catch (SpecSerDeException ssde) {
-        log.error("Cannot retrieve specs from catalog:", ssde);
+      } catch (IOException e) {
+        log.error("Cannot retrieve specs from catalog:", e);
       }
     }
   }
@@ -222,21 +229,12 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
   /* Catalog core functionality                     *
   /**************************************************/
 
-  public Iterator<URI> getSpecURIs() throws SpecSerDeException{
-    try {
-      return specStore.getSpecURIs();
-    } catch (IOException ioe) {
-      throw new SpecSerDeException("Cannot retrieve Specs' URI from Spec 
Store", specStore.getSpecStoreURI().get(), ioe);
-    }
+  public Iterator<URI> getSpecURIs() throws IOException {
+    return specStore.getSpecURIs();
   }
 
-  public Iterator<URI> getSpecURISWithTag(String tag) throws 
SpecSerDeException {
-    try {
-      return specStore.getSpecURIsWithTag(tag);
-    } catch (IOException ioe) {
-      throw new SpecSerDeException( String.format("Cannot retrieve Specs' URI 
with tag %s from Spec Store", tag),
-          specStore.getSpecStoreURI().get(), ioe);
-    }
+  public Iterator<URI> getSpecURISWithTag(String tag) throws IOException {
+    return specStore.getSpecURIsWithTag(tag);
   }
 
   /**
@@ -348,14 +346,4 @@ public class FlowCatalog extends AbstractIdleService 
implements SpecCatalog, Mut
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: 
" + uri, e);
     }
   }
-
-  @Override
-  public byte[] serialize(Spec spec) {
-    return SerializationUtils.serialize(spec);
-  }
-
-  @Override
-  public Spec deserialize(byte[] spec) {
-    return SerializationUtils.deserialize(spec);
-  }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 043dd04..4814a71 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -28,7 +28,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,16 +56,19 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.callbacks.CallbackResult;
 import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
 
 
 @Alpha
-public class TopologyCatalog extends AbstractIdleService implements 
SpecCatalog, MutableSpecCatalog, SpecSerDe {
+public class TopologyCatalog extends AbstractIdleService implements 
SpecCatalog, MutableSpecCatalog {
 
   public static final String DEFAULT_TOPOLOGYSPEC_STORE_CLASS = 
FSSpecStore.class.getCanonicalName();
+  public static final String DEFAULT_TOPOLOGYSPEC_SERDE_CLASS = 
JavaSpecSerDe.class.getCanonicalName();
 
   protected final SpecCatalogListenersList listeners;
   protected final Logger log;
@@ -114,13 +116,17 @@ public class TopologyCatalog extends AbstractIdleService 
implements SpecCatalog,
         newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
             config.getValue(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY));
       }
-      String specStoreClassName = DEFAULT_TOPOLOGYSPEC_STORE_CLASS;
-      if (config.hasPath(ConfigurationKeys.TOPOLOGYSPEC_STORE_CLASS_KEY)) {
-        specStoreClassName = 
config.getString(ConfigurationKeys.TOPOLOGYSPEC_STORE_CLASS_KEY);
-      }
+      String specStoreClassName = ConfigUtils.getString(config, 
ConfigurationKeys.TOPOLOGYSPEC_STORE_CLASS_KEY,
+          DEFAULT_TOPOLOGYSPEC_STORE_CLASS);
       this.log.info("Using SpecStore class name/alias " + specStoreClassName);
+      String specSerDeClassName = ConfigUtils.getString(config, 
ConfigurationKeys.TOPOLOGYSPEC_SERDE_CLASS_KEY,
+          DEFAULT_TOPOLOGYSPEC_SERDE_CLASS);
+      this.log.info("Using SpecSerDe class name/alias " + specSerDeClassName);
+
+      SpecSerDe specSerDe = (SpecSerDe) 
ConstructorUtils.invokeConstructor(Class.forName(
+          new 
ClassAliasResolver<>(SpecSerDe.class).resolve(specSerDeClassName)));
       this.specStore = (SpecStore) 
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
-          specStoreClassName)), newConfig, this);
+          specStoreClassName)), newConfig, specSerDe);
     } catch (NoSuchMethodException | IllegalAccessException | 
InvocationTargetException | InstantiationException
         | ClassNotFoundException e) {
       throw new RuntimeException(e);
@@ -269,14 +275,4 @@ public class TopologyCatalog extends AbstractIdleService 
implements SpecCatalog,
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: 
" + uri, e);
     }
   }
-
-  @Override
-  public byte[] serialize(Spec spec) {
-    return SerializationUtils.serialize(spec);
-  }
-
-  @Override
-  public Spec deserialize(byte[] spec) {
-    return SerializationUtils.deserialize(spec);
-  }
 }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
new file mode 100644
index 0000000..65f867e
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+
+
+public class FlowSpecDeserializer implements JsonDeserializer<FlowSpec> {
+  @Override
+  public FlowSpec deserialize(JsonElement json, Type typeOfT, 
JsonDeserializationContext context) {
+    JsonObject jsonObject = json.getAsJsonObject();
+
+    String uri = 
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_URI_KEY).getAsString();
+    String version = 
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_VERSION_KEY).getAsString();
+    String description = 
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_DESCRIPTION_KEY).getAsString();
+    Config config = 
ConfigFactory.parseString(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_KEY).getAsString());
+
+    Properties properties = new Properties();
+    try {
+      properties.load(new 
StringReader(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
+    } catch (IOException e) {
+      throw new JsonParseException(e);
+    }
+
+    Set<URI> templateURIs = new HashSet<>();
+    try {
+      for (JsonElement template : 
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_TEMPLATE_URIS_KEY).getAsJsonArray())
 {
+        templateURIs.add(new URI(template.getAsString()));
+      }
+    } catch (URISyntaxException e) {
+      throw new JsonParseException(e);
+    }
+
+    List<Spec> childSpecs = new ArrayList<>();
+    for (JsonElement spec : 
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CHILD_SPECS_KEY).getAsJsonArray()) {
+      childSpecs.add(context.deserialize(spec, FlowSpec.class));
+    }
+
+    FlowSpec.Builder builder = 
FlowSpec.builder(uri).withVersion(version).withDescription(description).withConfig(config)
+        .withConfigAsProperties(properties);
+    if (!templateURIs.isEmpty()) {
+      builder = builder.withTemplates(templateURIs);
+    }
+    if (!childSpecs.isEmpty()) {
+      builder = builder.withChildSpecs(childSpecs);
+    }
+
+    return builder.build();
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
new file mode 100644
index 0000000..85a5c34
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Type;
+import java.net.URI;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+
+
+public class FlowSpecSerializer implements JsonSerializer<FlowSpec> {
+  public static final String FLOW_SPEC_URI_KEY= "uri";
+  public static final String FLOW_SPEC_VERSION_KEY = "version";
+  public static final String FLOW_SPEC_DESCRIPTION_KEY = "description";
+  public static final String FLOW_SPEC_CONFIG_KEY = "config";
+  public static final String FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY = 
"configAsProperties";
+  public static final String FLOW_SPEC_TEMPLATE_URIS_KEY = "templateURIs";
+  public static final String FLOW_SPEC_CHILD_SPECS_KEY = "childSpecs";
+
+  @Override
+  public JsonElement serialize(FlowSpec src, Type typeOfSrc, 
JsonSerializationContext context) {
+    JsonObject flowSpecJson = new JsonObject();
+
+    flowSpecJson.add(FLOW_SPEC_URI_KEY, context.serialize(src.getUri()));
+    flowSpecJson.add(FLOW_SPEC_VERSION_KEY, 
context.serialize(src.getVersion()));
+    flowSpecJson.add(FLOW_SPEC_DESCRIPTION_KEY, 
context.serialize(src.getDescription()));
+    flowSpecJson.add(FLOW_SPEC_CONFIG_KEY, 
context.serialize(src.getConfig().root().render(ConfigRenderOptions.concise())));
+
+    StringWriter writer = new StringWriter();
+    try {
+      src.getConfigAsProperties().store(writer, "");
+    } catch (IOException e) {
+      throw new JsonParseException(e);
+    }
+    flowSpecJson.add(FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY, 
context.serialize(writer.getBuffer().toString()));
+
+    JsonArray templateURIs = new JsonArray();
+    if (src.getTemplateURIs().isPresent()) {
+      for (URI templateURI : src.getTemplateURIs().get()) {
+        templateURIs.add(context.serialize(templateURI));
+      }
+    }
+    flowSpecJson.add(FLOW_SPEC_TEMPLATE_URIS_KEY, templateURIs);
+
+    JsonArray childSpecs = new JsonArray();
+    if (src.getChildSpecs().isPresent()) {
+      for (Spec spec : src.getChildSpecs().get()) {
+        childSpecs.add(context.serialize(spec));
+      }
+    }
+    flowSpecJson.add(FLOW_SPEC_CHILD_SPECS_KEY, childSpecs);
+
+    return flowSpecJson;
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
new file mode 100644
index 0000000..dd8d20d
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+
+
+/**
+ * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that 
currently only {@link FlowSpec}s are supported.
+ */
+public class GsonFlowSpecSerDe implements SpecSerDe {
+  private GsonSerDe<FlowSpec> gsonSerDe;
+
+  public GsonFlowSpecSerDe() {
+    this.gsonSerDe = new GsonSerDe<>(new TypeToken<FlowSpec>(){}.getType(), 
new FlowSpecSerializer(), new FlowSpecDeserializer());
+  }
+
+  @Override
+  public byte[] serialize(Spec spec) throws SpecSerDeException {
+    if (!(spec instanceof FlowSpec)) {
+      throw new SpecSerDeException("Failed to serialize spec " + spec.getUri() 
+ ", only FlowSpec is supported");
+    }
+
+    try {
+      return this.gsonSerDe.serialize((FlowSpec) 
spec).getBytes(Charsets.UTF_8);
+    } catch (JsonParseException e) {
+      throw new SpecSerDeException(spec, e);
+    }
+  }
+
+  @Override
+  public Spec deserialize(byte[] spec) throws SpecSerDeException {
+    try {
+      return this.gsonSerDe.deserialize(new String(spec, Charsets.UTF_8));
+    } catch (JsonParseException e) {
+      throw new SpecSerDeException(e);
+    }
+  }
+}
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonSerDe.java
similarity index 97%
rename from 
gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
rename to 
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonSerDe.java
index be14e41..a943a76 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonSerDe.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.service.modules.spec;
+package org.apache.gobblin.runtime.spec_serde;
 
 import java.lang.reflect.Type;
 
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JavaSpecSerDe.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JavaSpecSerDe.java
new file mode 100644
index 0000000..1bc2bbc
--- /dev/null
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JavaSpecSerDe.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+
+
+/**
+ * {@link SpecSerDe} that does Java serialization using {@link 
SerializationUtils}
+ */
+public class JavaSpecSerDe implements SpecSerDe {
+  @Override
+  public byte[] serialize(Spec spec) throws SpecSerDeException {
+    try {
+      return SerializationUtils.serialize(spec);
+    } catch (SerializationException e) {
+      throw new SpecSerDeException(spec, e);
+    }
+  }
+
+  @Override
+  public Spec deserialize(byte[] spec) throws SpecSerDeException {
+    try {
+      return SerializationUtils.deserialize(spec);
+    } catch (SerializationException e) {
+      throw new SpecSerDeException(e);
+    }
+  }
+}
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index f2c1fd9..657c18f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -30,8 +30,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.lang3.SerializationException;
-
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
@@ -46,6 +44,7 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.api.SpecStore;
 
 
@@ -128,7 +127,7 @@ public class MysqlSpecStore implements SpecStore {
       statement.executeUpdate();
 
       connection.commit();
-    } catch (SQLException | SerializationException e) {
+    } catch (SQLException | SpecSerDeException e) {
       throw new IOException(e);
     }
   }
@@ -177,7 +176,7 @@ public class MysqlSpecStore implements SpecStore {
         Blob blob = rs.getBlob(1);
         return 
this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream()));
       }
-    } catch (SQLException | SerializationException e) {
+    } catch (SQLException | SpecSerDeException e) {
       throw new IOException(e);
     }
   }
@@ -204,7 +203,7 @@ public class MysqlSpecStore implements SpecStore {
           try {
             Blob blob = rs.getBlob(2);
             
specs.add(this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream())));
-          } catch (SQLException | SerializationException e) {
+          } catch (SQLException | SpecSerDeException e) {
             log.error("Failed to deserialize spec", e);
           }
         }
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializationTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializationTest.java
new file mode 100644
index 0000000..1e8c42f
--- /dev/null
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.runtime.api.FlowSpec;
+
+
+public class FlowSpecSerializationTest {
+  private Gson gson = new GsonBuilder().registerTypeAdapter(new 
TypeToken<FlowSpec>() {}.getType(), new FlowSpecSerializer())
+      .registerTypeAdapter(new TypeToken<FlowSpec>() {}.getType(), new 
FlowSpecDeserializer()).create();
+
+  private FlowSpec flowSpec1;
+  private FlowSpec flowSpec2;
+  private FlowSpec flowSpec3;
+
+  @BeforeClass
+  public void setUp() throws URISyntaxException {
+    gson = new GsonBuilder().registerTypeAdapter(new TypeToken<FlowSpec>() 
{}.getType(), new FlowSpecSerializer())
+        .registerTypeAdapter(new TypeToken<FlowSpec>() {}.getType(), new 
FlowSpecDeserializer()).create();
+
+    flowSpec1 = 
FlowSpec.builder("flowspec1").withVersion("version1").withDescription("description1")
+        .withConfig(ConfigBuilder.create().addPrimitive("key1", 
"value1").build()).build();
+    flowSpec2 = 
FlowSpec.builder("flowspec2").withVersion("version2").withDescription("description2")
+        .withConfig(ConfigBuilder.create().addPrimitive("key2", 
"value2").build()).build();
+    flowSpec3 = 
FlowSpec.builder("flowspec3").withVersion("version3").withDescription("description3")
+        .withConfig(ConfigBuilder.create().addPrimitive("key3", 
"value3").build())
+        .withTemplates(Arrays.asList(new URI("template1"), new 
URI("template2")))
+        .withChildSpecs(Arrays.asList(flowSpec1, flowSpec2)).build();
+  }
+
+  @Test
+  public void testSerializeWithNoChildren() {
+    String json = gson.toJson(flowSpec1);
+    Assert.assertEquals(gson.fromJson(json, FlowSpec.class), flowSpec1);
+  }
+
+  @Test
+  public void testSerializeWithChildren() {
+    String json = gson.toJson(flowSpec3);
+    Assert.assertEquals(gson.fromJson(json, FlowSpec.class), flowSpec3);
+  }
+}
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 50c4aa3..10357b4 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -38,7 +38,8 @@ import 
org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
 
 
 public class MysqlSpecStoreTest {
@@ -158,9 +159,9 @@ public class MysqlSpecStoreTest {
     Assert.assertFalse(this.specStore.exists(this.uri1));
   }
 
-  public class TestSpecSerDe implements SpecSerDe {
+  public class TestSpecSerDe extends JavaSpecSerDe {
     @Override
-    public byte[] serialize(Spec spec) {
+    public byte[] serialize(Spec spec) throws SpecSerDeException {
       byte[] bytes = SerializationUtils.serialize(spec);
       // Reverse bytes to simulate corrupted Spec
       if (spec.getUri().equals(uri3)) {
@@ -168,10 +169,5 @@ public class MysqlSpecStoreTest {
       }
       return bytes;
     }
-
-    @Override
-    public Spec deserialize(byte[] spec) {
-      return SerializationUtils.deserialize(spec);
-    }
   }
 }
\ No newline at end of file
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 11e5f16..de8cd0b 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -38,8 +38,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.GsonSerDe;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 5e2bd83..1eb356a0 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -29,8 +29,8 @@ import org.apache.gobblin.metastore.MysqlStateStore;
 import org.apache.gobblin.metastore.MysqlStateStoreFactory;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.GsonSerDe;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index e14ce4c..a691811 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,25 +17,38 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import java.net.URI;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -50,17 +63,8 @@ import 
org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
+import static 
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 
 
 /**
@@ -175,8 +179,8 @@ public class GobblinServiceJobScheduler extends 
JobScheduler implements SpecCata
         clearRunningFlowState(drUris);
       }
 
-    } catch (SpecSerDeException ssde) {
-      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", ssde);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to get the iterator of all Spec 
URIS", e);
     }
 
     try {

Reply via email to