This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2502a1f [GOBBLIN-793] Separate SpecSerDe from SpecCatalogs and add
GsonSpecSerDe
2502a1f is described below
commit 2502a1ff1da1798ddb3804873bea07fd8755e06f
Author: Jack Moseley <[email protected]>
AuthorDate: Thu Jun 13 10:21:20 2019 -0700
[GOBBLIN-793] Separate SpecSerDe from SpecCatalogs and add GsonSpecSerDe
Closes #2658 from jack-moseley/spec-serde
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../org/apache/gobblin/runtime/api/SpecSerDe.java | 4 +-
.../gobblin/runtime/api/SpecSerDeException.java | 31 ++------
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 68 +++++++----------
.../runtime/spec_catalog/TopologyCatalog.java | 30 ++++----
.../runtime/spec_serde/FlowSpecDeserializer.java | 85 ++++++++++++++++++++++
.../runtime/spec_serde/FlowSpecSerializer.java | 81 +++++++++++++++++++++
.../runtime/spec_serde/GsonFlowSpecSerDe.java | 62 ++++++++++++++++
.../gobblin/runtime/spec_serde}/GsonSerDe.java | 2 +-
.../gobblin/runtime/spec_serde/JavaSpecSerDe.java | 49 +++++++++++++
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 9 +--
.../spec_serde/FlowSpecSerializationTest.java | 70 ++++++++++++++++++
.../runtime/spec_store/MysqlSpecStoreTest.java | 12 +--
.../modules/orchestration/FSDagStateStore.java | 2 +-
.../modules/orchestration/MysqlDagStateStore.java | 2 +-
.../scheduler/GobblinServiceJobScheduler.java | 40 +++++-----
16 files changed, 431 insertions(+), 117 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 58e4255..8744c0d 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -894,6 +894,7 @@ public class ConfigurationKeys {
* Configuration properties related to TopologySpec Store
*/
public static final String TOPOLOGYSPEC_STORE_CLASS_KEY =
"topologySpec.store.class";
+ public static final String TOPOLOGYSPEC_SERDE_CLASS_KEY =
"topologySpec.serde.class";
public static final String TOPOLOGYSPEC_STORE_DIR_KEY =
"topologySpec.store.dir";
/***
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
index 85f38cc..d712322 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDe.java
@@ -26,12 +26,12 @@ public interface SpecSerDe {
* @param spec {@link Spec} to serialize.
* @return byte array of serialized {@link Spec}.
*/
- public byte[] serialize(Spec spec);
+ byte[] serialize(Spec spec) throws SpecSerDeException;
/***
* Deserialize byte array into a {@link Spec}.
* @param spec byte array to deserialize.
* @return deserialized {@link Spec}.
*/
- public Spec deserialize(byte[] spec);
+ Spec deserialize(byte[] spec) throws SpecSerDeException;
}
\ No newline at end of file
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
index 776ff6f..6097aa9 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
@@ -17,37 +17,20 @@
package org.apache.gobblin.runtime.api;
-import java.net.URI;
-
-
/**
* An exception when {@link Spec} cannot be correctly serialized/deserialized
from underlying storage.
*/
-public class SpecSerDeException extends Exception {
- private static final long serialVersionUID = 1L;
-
- /**
- * The URI that triggered SerDe error.
- * Could be a single Spec's URI or parent-level URI.
- */
- private final URI errorUri;
-
- public SpecSerDeException(URI errorUri) {
- super("Error occurred in loading of Spec with URI " + errorUri);
- this.errorUri = errorUri;
- }
+public class SpecSerDeException extends RuntimeException {
- public SpecSerDeException(URI errorUri, Throwable cause) {
- super("Error occurred in loading URI " + errorUri, cause);
- this.errorUri = errorUri;
+ public SpecSerDeException(Spec spec, Throwable cause) {
+ super("Error occurred when loading Spec with URI " + spec.getUri(), cause);
}
- public SpecSerDeException(String errorMsg, URI errorUri, Throwable cause) {
- super("Error occurred in loading URI " + errorUri + " with message:" +
errorMsg, cause);
- this.errorUri = errorUri;
+ public SpecSerDeException(Throwable cause) {
+ super("Error occurred when loading Spec", cause);
}
- public URI getErrorUri() {
- return errorUri;
+ public SpecSerDeException(String message) {
+ super(message);
}
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 5076092..1124137 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,10 +17,6 @@
package org.apache.gobblin.runtime.spec_catalog;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -31,13 +27,18 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import javax.annotation.Nonnull;
-import org.apache.commons.lang3.SerializationUtils;
+
import org.apache.commons.lang3.reflect.ConstructorUtils;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nonnull;
+
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -50,8 +51,10 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
@@ -60,7 +63,7 @@ import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
* A service that interact with FlowSpec storage.
* The FlowSpec storage, a.k.a. {@link SpecStore} should be plugable with
different implementation.
*/
-public class FlowCatalog extends AbstractIdleService implements SpecCatalog,
MutableSpecCatalog, SpecSerDe {
+public class FlowCatalog extends AbstractIdleService implements SpecCatalog,
MutableSpecCatalog {
/***
* Configuration properties related to FlowSpec Store
@@ -68,6 +71,8 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
public static final String FLOWSPEC_STORE_CLASS_KEY = "flowSpec.store.class";
public static final String FLOWSPEC_STORE_DIR_KEY = "flowSpec.store.dir";
public static final String DEFAULT_FLOWSPEC_STORE_CLASS =
FSSpecStore.class.getCanonicalName();
+ public static final String FLOWSPEC_SERDE_CLASS_KEY = "flowSpec.serde.class";
+ public static final String DEFAULT_FLOWSPEC_SERDE_CLASS =
JavaSpecSerDe.class.getCanonicalName();
protected final SpecCatalogListenersList listeners;
protected final Logger log;
@@ -112,13 +117,15 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
config.getValue(FLOWSPEC_STORE_DIR_KEY));
}
- String specStoreClassName = DEFAULT_FLOWSPEC_STORE_CLASS;
- if (config.hasPath(FLOWSPEC_STORE_CLASS_KEY)) {
- specStoreClassName = config.getString(FLOWSPEC_STORE_CLASS_KEY);
- }
+ String specStoreClassName = ConfigUtils.getString(config,
FLOWSPEC_STORE_CLASS_KEY, DEFAULT_FLOWSPEC_STORE_CLASS);
this.log.info(String.format("Using class name/alias [%s] for specstore",
specStoreClassName));
+ String specSerDeClassName = ConfigUtils.getString(config,
FLOWSPEC_SERDE_CLASS_KEY, DEFAULT_FLOWSPEC_SERDE_CLASS);
+ this.log.info(String.format("Using class name/alias [%s] for spec
serde", specSerDeClassName));
+
+ SpecSerDe specSerDe = (SpecSerDe)
ConstructorUtils.invokeConstructor(Class.forName(
+ new
ClassAliasResolver<>(SpecSerDe.class).resolve(specSerDeClassName)));
this.specStore = (SpecStore)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
- specStoreClassName)), newConfig, this);
+ specStoreClassName)), newConfig, specSerDe);
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
| ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -149,8 +156,8 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
while (uriIterator.hasNext()) {
this.listeners.onAddSpec(getSpecWrapper(uriIterator.next()));
}
- } catch (SpecSerDeException ssde) {
- log.error("Cannot retrieve specs from catalog:", ssde);
+ } catch (IOException e) {
+ log.error("Cannot retrieve specs from catalog:", e);
}
}
@@ -167,8 +174,8 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
new
SpecCatalogListener.AddSpecCallback(getSpecWrapper(uriIterator.next()));
this.listeners.callbackOneListener(addJobCallback, specListener);
}
- } catch (SpecSerDeException ssde) {
- log.error("Cannot retrieve specs from catalog:", ssde);
+ } catch (IOException e) {
+ log.error("Cannot retrieve specs from catalog:", e);
}
}
}
@@ -222,21 +229,12 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
/* Catalog core functionality *
/**************************************************/
- public Iterator<URI> getSpecURIs() throws SpecSerDeException{
- try {
- return specStore.getSpecURIs();
- } catch (IOException ioe) {
- throw new SpecSerDeException("Cannot retrieve Specs' URI from Spec
Store", specStore.getSpecStoreURI().get(), ioe);
- }
+ public Iterator<URI> getSpecURIs() throws IOException {
+ return specStore.getSpecURIs();
}
- public Iterator<URI> getSpecURISWithTag(String tag) throws
SpecSerDeException {
- try {
- return specStore.getSpecURIsWithTag(tag);
- } catch (IOException ioe) {
- throw new SpecSerDeException( String.format("Cannot retrieve Specs' URI
with tag %s from Spec Store", tag),
- specStore.getSpecStoreURI().get(), ioe);
- }
+ public Iterator<URI> getSpecURISWithTag(String tag) throws IOException {
+ return specStore.getSpecURIsWithTag(tag);
}
/**
@@ -348,14 +346,4 @@ public class FlowCatalog extends AbstractIdleService
implements SpecCatalog, Mut
throw new RuntimeException("Cannot delete Spec from Spec store for URI:
" + uri, e);
}
}
-
- @Override
- public byte[] serialize(Spec spec) {
- return SerializationUtils.serialize(spec);
- }
-
- @Override
- public Spec deserialize(byte[] spec) {
- return SerializationUtils.deserialize(spec);
- }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 043dd04..4814a71 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,16 +56,19 @@ import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.util.ClassAliasResolver;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.callbacks.CallbackResult;
import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
@Alpha
-public class TopologyCatalog extends AbstractIdleService implements
SpecCatalog, MutableSpecCatalog, SpecSerDe {
+public class TopologyCatalog extends AbstractIdleService implements
SpecCatalog, MutableSpecCatalog {
public static final String DEFAULT_TOPOLOGYSPEC_STORE_CLASS =
FSSpecStore.class.getCanonicalName();
+ public static final String DEFAULT_TOPOLOGYSPEC_SERDE_CLASS =
JavaSpecSerDe.class.getCanonicalName();
protected final SpecCatalogListenersList listeners;
protected final Logger log;
@@ -114,13 +116,17 @@ public class TopologyCatalog extends AbstractIdleService
implements SpecCatalog,
newConfig = config.withValue(FSSpecStore.SPECSTORE_FS_DIR_KEY,
config.getValue(ConfigurationKeys.TOPOLOGYSPEC_STORE_DIR_KEY));
}
- String specStoreClassName = DEFAULT_TOPOLOGYSPEC_STORE_CLASS;
- if (config.hasPath(ConfigurationKeys.TOPOLOGYSPEC_STORE_CLASS_KEY)) {
- specStoreClassName =
config.getString(ConfigurationKeys.TOPOLOGYSPEC_STORE_CLASS_KEY);
- }
+ String specStoreClassName = ConfigUtils.getString(config,
ConfigurationKeys.TOPOLOGYSPEC_STORE_CLASS_KEY,
+ DEFAULT_TOPOLOGYSPEC_STORE_CLASS);
this.log.info("Using SpecStore class name/alias " + specStoreClassName);
+ String specSerDeClassName = ConfigUtils.getString(config,
ConfigurationKeys.TOPOLOGYSPEC_SERDE_CLASS_KEY,
+ DEFAULT_TOPOLOGYSPEC_SERDE_CLASS);
+ this.log.info("Using SpecSerDe class name/alias " + specSerDeClassName);
+
+ SpecSerDe specSerDe = (SpecSerDe)
ConstructorUtils.invokeConstructor(Class.forName(
+ new
ClassAliasResolver<>(SpecSerDe.class).resolve(specSerDeClassName)));
this.specStore = (SpecStore)
ConstructorUtils.invokeConstructor(Class.forName(this.aliasResolver.resolve(
- specStoreClassName)), newConfig, this);
+ specStoreClassName)), newConfig, specSerDe);
} catch (NoSuchMethodException | IllegalAccessException |
InvocationTargetException | InstantiationException
| ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -269,14 +275,4 @@ public class TopologyCatalog extends AbstractIdleService
implements SpecCatalog,
throw new RuntimeException("Cannot delete Spec from Spec store for URI:
" + uri, e);
}
}
-
- @Override
- public byte[] serialize(Spec spec) {
- return SerializationUtils.serialize(spec);
- }
-
- @Override
- public Spec deserialize(byte[] spec) {
- return SerializationUtils.deserialize(spec);
- }
}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
new file mode 100644
index 0000000..65f867e
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+
+
+public class FlowSpecDeserializer implements JsonDeserializer<FlowSpec> {
+ @Override
+ public FlowSpec deserialize(JsonElement json, Type typeOfT,
JsonDeserializationContext context) {
+ JsonObject jsonObject = json.getAsJsonObject();
+
+ String uri =
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_URI_KEY).getAsString();
+ String version =
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_VERSION_KEY).getAsString();
+ String description =
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_DESCRIPTION_KEY).getAsString();
+ Config config =
ConfigFactory.parseString(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_KEY).getAsString());
+
+ Properties properties = new Properties();
+ try {
+ properties.load(new
StringReader(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
+ } catch (IOException e) {
+ throw new JsonParseException(e);
+ }
+
+ Set<URI> templateURIs = new HashSet<>();
+ try {
+ for (JsonElement template :
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_TEMPLATE_URIS_KEY).getAsJsonArray())
{
+ templateURIs.add(new URI(template.getAsString()));
+ }
+ } catch (URISyntaxException e) {
+ throw new JsonParseException(e);
+ }
+
+ List<Spec> childSpecs = new ArrayList<>();
+ for (JsonElement spec :
jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CHILD_SPECS_KEY).getAsJsonArray()) {
+ childSpecs.add(context.deserialize(spec, FlowSpec.class));
+ }
+
+ FlowSpec.Builder builder =
FlowSpec.builder(uri).withVersion(version).withDescription(description).withConfig(config)
+ .withConfigAsProperties(properties);
+ if (!templateURIs.isEmpty()) {
+ builder = builder.withTemplates(templateURIs);
+ }
+ if (!childSpecs.isEmpty()) {
+ builder = builder.withChildSpecs(childSpecs);
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
new file mode 100644
index 0000000..85a5c34
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.lang.reflect.Type;
+import java.net.URI;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+
+
+public class FlowSpecSerializer implements JsonSerializer<FlowSpec> {
+ public static final String FLOW_SPEC_URI_KEY= "uri";
+ public static final String FLOW_SPEC_VERSION_KEY = "version";
+ public static final String FLOW_SPEC_DESCRIPTION_KEY = "description";
+ public static final String FLOW_SPEC_CONFIG_KEY = "config";
+ public static final String FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY =
"configAsProperties";
+ public static final String FLOW_SPEC_TEMPLATE_URIS_KEY = "templateURIs";
+ public static final String FLOW_SPEC_CHILD_SPECS_KEY = "childSpecs";
+
+ @Override
+ public JsonElement serialize(FlowSpec src, Type typeOfSrc,
JsonSerializationContext context) {
+ JsonObject flowSpecJson = new JsonObject();
+
+ flowSpecJson.add(FLOW_SPEC_URI_KEY, context.serialize(src.getUri()));
+ flowSpecJson.add(FLOW_SPEC_VERSION_KEY,
context.serialize(src.getVersion()));
+ flowSpecJson.add(FLOW_SPEC_DESCRIPTION_KEY,
context.serialize(src.getDescription()));
+ flowSpecJson.add(FLOW_SPEC_CONFIG_KEY,
context.serialize(src.getConfig().root().render(ConfigRenderOptions.concise())));
+
+ StringWriter writer = new StringWriter();
+ try {
+ src.getConfigAsProperties().store(writer, "");
+ } catch (IOException e) {
+ throw new JsonParseException(e);
+ }
+ flowSpecJson.add(FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY,
context.serialize(writer.getBuffer().toString()));
+
+ JsonArray templateURIs = new JsonArray();
+ if (src.getTemplateURIs().isPresent()) {
+ for (URI templateURI : src.getTemplateURIs().get()) {
+ templateURIs.add(context.serialize(templateURI));
+ }
+ }
+ flowSpecJson.add(FLOW_SPEC_TEMPLATE_URIS_KEY, templateURIs);
+
+ JsonArray childSpecs = new JsonArray();
+ if (src.getChildSpecs().isPresent()) {
+ for (Spec spec : src.getChildSpecs().get()) {
+ childSpecs.add(context.serialize(spec));
+ }
+ }
+ flowSpecJson.add(FLOW_SPEC_CHILD_SPECS_KEY, childSpecs);
+
+ return flowSpecJson;
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
new file mode 100644
index 0000000..dd8d20d
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonFlowSpecSerDe.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import com.google.common.base.Charsets;
+import com.google.gson.Gson;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+
+
+/**
+ * {@link SpecSerDe} that serializes as Json using {@link Gson}. Note that
currently only {@link FlowSpec}s are supported.
+ */
+public class GsonFlowSpecSerDe implements SpecSerDe {
+ private GsonSerDe<FlowSpec> gsonSerDe;
+
+ public GsonFlowSpecSerDe() {
+ this.gsonSerDe = new GsonSerDe<>(new TypeToken<FlowSpec>(){}.getType(),
new FlowSpecSerializer(), new FlowSpecDeserializer());
+ }
+
+ @Override
+ public byte[] serialize(Spec spec) throws SpecSerDeException {
+ if (!(spec instanceof FlowSpec)) {
+ throw new SpecSerDeException("Failed to serialize spec " + spec.getUri()
+ ", only FlowSpec is supported");
+ }
+
+ try {
+ return this.gsonSerDe.serialize((FlowSpec)
spec).getBytes(Charsets.UTF_8);
+ } catch (JsonParseException e) {
+ throw new SpecSerDeException(spec, e);
+ }
+ }
+
+ @Override
+ public Spec deserialize(byte[] spec) throws SpecSerDeException {
+ try {
+ return this.gsonSerDe.deserialize(new String(spec, Charsets.UTF_8));
+ } catch (JsonParseException e) {
+ throw new SpecSerDeException(e);
+ }
+ }
+}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonSerDe.java
similarity index 97%
rename from
gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
rename to
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonSerDe.java
index be14e41..a943a76 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/GsonSerDe.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.gobblin.service.modules.spec;
+package org.apache.gobblin.runtime.spec_serde;
import java.lang.reflect.Type;
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JavaSpecSerDe.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JavaSpecSerDe.java
new file mode 100644
index 0000000..1bc2bbc
--- /dev/null
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/JavaSpecSerDe.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+
+
+/**
+ * {@link SpecSerDe} that does Java serialization using {@link
SerializationUtils}
+ */
+public class JavaSpecSerDe implements SpecSerDe {
+ @Override
+ public byte[] serialize(Spec spec) throws SpecSerDeException {
+ try {
+ return SerializationUtils.serialize(spec);
+ } catch (SerializationException e) {
+ throw new SpecSerDeException(spec, e);
+ }
+ }
+
+ @Override
+ public Spec deserialize(byte[] spec) throws SpecSerDeException {
+ try {
+ return SerializationUtils.deserialize(spec);
+ } catch (SerializationException e) {
+ throw new SpecSerDeException(e);
+ }
+ }
+}
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index f2c1fd9..657c18f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -30,8 +30,6 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
-import org.apache.commons.lang3.SerializationException;
-
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
@@ -46,6 +44,7 @@ import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.api.SpecStore;
@@ -128,7 +127,7 @@ public class MysqlSpecStore implements SpecStore {
statement.executeUpdate();
connection.commit();
- } catch (SQLException | SerializationException e) {
+ } catch (SQLException | SpecSerDeException e) {
throw new IOException(e);
}
}
@@ -177,7 +176,7 @@ public class MysqlSpecStore implements SpecStore {
Blob blob = rs.getBlob(1);
return
this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream()));
}
- } catch (SQLException | SerializationException e) {
+ } catch (SQLException | SpecSerDeException e) {
throw new IOException(e);
}
}
@@ -204,7 +203,7 @@ public class MysqlSpecStore implements SpecStore {
try {
Blob blob = rs.getBlob(2);
specs.add(this.specSerDe.deserialize(ByteStreams.toByteArray(blob.getBinaryStream())));
- } catch (SQLException | SerializationException e) {
+ } catch (SQLException | SpecSerDeException e) {
log.error("Failed to deserialize spec", e);
}
}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializationTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializationTest.java
new file mode 100644
index 0000000..1e8c42f
--- /dev/null
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializationTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_serde;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.runtime.api.FlowSpec;
+
+
+public class FlowSpecSerializationTest {
+ private Gson gson = new GsonBuilder().registerTypeAdapter(new
TypeToken<FlowSpec>() {}.getType(), new FlowSpecSerializer())
+ .registerTypeAdapter(new TypeToken<FlowSpec>() {}.getType(), new
FlowSpecDeserializer()).create();
+
+ private FlowSpec flowSpec1;
+ private FlowSpec flowSpec2;
+ private FlowSpec flowSpec3;
+
+ @BeforeClass
+ public void setUp() throws URISyntaxException {
+ gson = new GsonBuilder().registerTypeAdapter(new TypeToken<FlowSpec>()
{}.getType(), new FlowSpecSerializer())
+ .registerTypeAdapter(new TypeToken<FlowSpec>() {}.getType(), new
FlowSpecDeserializer()).create();
+
+ flowSpec1 =
FlowSpec.builder("flowspec1").withVersion("version1").withDescription("description1")
+ .withConfig(ConfigBuilder.create().addPrimitive("key1",
"value1").build()).build();
+ flowSpec2 =
FlowSpec.builder("flowspec2").withVersion("version2").withDescription("description2")
+ .withConfig(ConfigBuilder.create().addPrimitive("key2",
"value2").build()).build();
+ flowSpec3 =
FlowSpec.builder("flowspec3").withVersion("version3").withDescription("description3")
+ .withConfig(ConfigBuilder.create().addPrimitive("key3",
"value3").build())
+ .withTemplates(Arrays.asList(new URI("template1"), new
URI("template2")))
+ .withChildSpecs(Arrays.asList(flowSpec1, flowSpec2)).build();
+ }
+
+ @Test
+ public void testSerializeWithNoChildren() {
+ String json = gson.toJson(flowSpec1);
+ Assert.assertEquals(gson.fromJson(json, FlowSpec.class), flowSpec1);
+ }
+
+ @Test
+ public void testSerializeWithChildren() {
+ String json = gson.toJson(flowSpec3);
+ Assert.assertEquals(gson.fromJson(json, FlowSpec.class), flowSpec3);
+ }
+}
diff --git
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 50c4aa3..10357b4 100644
---
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -38,7 +38,8 @@ import
org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
public class MysqlSpecStoreTest {
@@ -158,9 +159,9 @@ public class MysqlSpecStoreTest {
Assert.assertFalse(this.specStore.exists(this.uri1));
}
- public class TestSpecSerDe implements SpecSerDe {
+ public class TestSpecSerDe extends JavaSpecSerDe {
@Override
- public byte[] serialize(Spec spec) {
+ public byte[] serialize(Spec spec) throws SpecSerDeException {
byte[] bytes = SerializationUtils.serialize(spec);
// Reverse bytes to simulate corrupted Spec
if (spec.getUri().equals(uri3)) {
@@ -168,10 +169,5 @@ public class MysqlSpecStoreTest {
}
return bytes;
}
-
- @Override
- public Spec deserialize(byte[] spec) {
- return SerializationUtils.deserialize(spec);
- }
}
}
\ No newline at end of file
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 11e5f16..de8cd0b 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -38,8 +38,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.GsonSerDe;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
index 5e2bd83..1eb356a0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlDagStateStore.java
@@ -29,8 +29,8 @@ import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.MysqlStateStoreFactory;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.spec_serde.GsonSerDe;
import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.GsonSerDe;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index e14ce4c..a691811 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,25 +17,38 @@
package org.apache.gobblin.service.modules.scheduler;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
-import java.net.URI;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -50,17 +63,8 @@ import
org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
+import static
org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
/**
@@ -175,8 +179,8 @@ public class GobblinServiceJobScheduler extends
JobScheduler implements SpecCata
clearRunningFlowState(drUris);
}
- } catch (SpecSerDeException ssde) {
- throw new RuntimeException("Failed to get the iterator of all Spec
URIS", ssde);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to get the iterator of all Spec
URIS", e);
}
try {