This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5ee331f [GOBBLIN-786] Separate SerDe library in DagStateStore out for
GaaS-wide sharing
5ee331f is described below
commit 5ee331f48ab4e0e5c69445cee7889a2d1d589693
Author: autumnust <[email protected]>
AuthorDate: Thu May 30 13:23:40 2019 -0700
[GOBBLIN-786] Separate SerDe library in DagStateStore out for GaaS-wide
sharing
Closes #2651 from autumnust/mysqldag_store
---
.../apache/gobblin/service/ServiceConfigKeys.java | 2 +-
.../modules/orchestration/FSDagStateStore.java | 21 ++++----
.../gobblin/service/modules/spec/GsonSerDe.java | 58 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 11 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 42baadb..3528cdb 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -22,7 +22,7 @@ import org.apache.gobblin.annotation.Alpha;
@Alpha
public class ServiceConfigKeys {
- private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+ public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
// Gobblin Service Manager Keys
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY =
GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 338cea7..62789a8 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -29,8 +29,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonSerializer;
import com.google.gson.reflect.TypeToken;
@@ -41,6 +39,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.GsonSerDe;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
@@ -52,11 +51,8 @@ import
org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
public class FSDagStateStore implements DagStateStore {
public static final String DAG_FILE_EXTENSION = ".dag";
- /** Type token for ser/de JobExecutionPlan list */
- private static final Type LIST_JOBEXECUTIONPLAN_TYPE = new
TypeToken<List<JobExecutionPlan>>(){}.getType();
-
private final String dagCheckpointDir;
- private final Gson gson;
+ private final GsonSerDe<List<JobExecutionPlan>> serDe;
public FSDagStateStore(Config config, Map<URI, TopologySpec>
topologySpecMap) throws IOException {
this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
@@ -69,8 +65,13 @@ public class FSDagStateStore implements DagStateStore {
JsonSerializer<List<JobExecutionPlan>> serializer = new
JobExecutionPlanListSerializer();
JsonDeserializer<List<JobExecutionPlan>> deserializer = new
JobExecutionPlanListDeserializer(topologySpecMap);
- this.gson = new
GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer)
- .registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE,
deserializer).create();
+
+ /** {@link Type} object will need to strictly match with the generic
arguments being used
+ * to define {@link GsonSerDe}
+ * Due to type erasure, the {@link Type} needs to initialized here instead
of inside {@link GsonSerDe}.
+ * */
+ Type typeToken = new TypeToken<List<JobExecutionPlan>>(){}.getType();
+ this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
}
/**
@@ -136,7 +137,7 @@ public class FSDagStateStore implements DagStateStore {
*/
private String serializeDag(Dag<JobExecutionPlan> dag) {
List<JobExecutionPlan> jobExecutionPlanList =
dag.getNodes().stream().map(Dag.DagNode::getValue).collect(Collectors.toList());
- return gson.toJson(jobExecutionPlanList, LIST_JOBEXECUTIONPLAN_TYPE);
+ return serDe.serialize(jobExecutionPlanList);
}
/**
@@ -145,6 +146,6 @@ public class FSDagStateStore implements DagStateStore {
* @return a {@link Dag} parametrized by {@link JobExecutionPlan}.
*/
private Dag<JobExecutionPlan> deserializeDag(String jsonDag) {
- return new JobExecutionPlanDagFactory().createDag(gson.fromJson(jsonDag,
LIST_JOBEXECUTIONPLAN_TYPE));
+ return new
JobExecutionPlanDagFactory().createDag(serDe.deserialize(jsonDag));
}
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
new file mode 100644
index 0000000..be14e41
--- /dev/null
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+
+
+/**
+ * SerDe library used in GaaS for {@link
org.apache.gobblin.runtime.api.SpecStore} and
+ * {@link org.apache.gobblin.service.modules.orchestration.DagStateStore}.
+ *
+ * The solution is built on top of {@link Gson} library.
+ * @param <T> The type of object to be serialized.
+ */
+public class GsonSerDe<T> {
+ private final Gson gson;
+ private final JsonSerializer<T> serializer;
+ private final JsonDeserializer<T> deserializer;
+ private final Type type;
+
+ public GsonSerDe(Type type, JsonSerializer<T> serializer,
JsonDeserializer<T> deserializer) {
+ this.serializer = serializer;
+ this.deserializer = deserializer;
+ this.type = type;
+
+ this.gson = new GsonBuilder().registerTypeAdapter(type, serializer)
+ .registerTypeAdapter(type, deserializer)
+ .create();
+ }
+
+ public String serialize(T object) {
+ return gson.toJson(object, type);
+ }
+
+ public T deserialize(String serializedObject) {
+ return gson.fromJson(serializedObject, type);
+ }
+}
\ No newline at end of file