This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ee331f  [GOBBLIN-786] Separate SerDe library in DagStateStore out for 
GaaS-wide sharing
5ee331f is described below

commit 5ee331f48ab4e0e5c69445cee7889a2d1d589693
Author: autumnust <[email protected]>
AuthorDate: Thu May 30 13:23:40 2019 -0700

    [GOBBLIN-786] Separate SerDe library in DagStateStore out for GaaS-wide 
sharing
    
    Closes #2651 from autumnust/mysqldag_store
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  2 +-
 .../modules/orchestration/FSDagStateStore.java     | 21 ++++----
 .../gobblin/service/modules/spec/GsonSerDe.java    | 58 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 11 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 42baadb..3528cdb 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -22,7 +22,7 @@ import org.apache.gobblin.annotation.Alpha;
 @Alpha
 public class ServiceConfigKeys {
 
-  private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+  public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
 
   // Gobblin Service Manager Keys
   public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = 
GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
index 338cea7..62789a8 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FSDagStateStore.java
@@ -29,8 +29,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
 import com.google.gson.JsonDeserializer;
 import com.google.gson.JsonSerializer;
 import com.google.gson.reflect.TypeToken;
@@ -41,6 +39,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.runtime.api.TopologySpec;
 import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.GsonSerDe;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
 import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListDeserializer;
@@ -52,11 +51,8 @@ import 
org.apache.gobblin.service.modules.spec.JobExecutionPlanListSerializer;
 public class FSDagStateStore implements DagStateStore {
   public static final String DAG_FILE_EXTENSION = ".dag";
 
-  /** Type token for ser/de JobExecutionPlan list */
-  private static final Type LIST_JOBEXECUTIONPLAN_TYPE = new 
TypeToken<List<JobExecutionPlan>>(){}.getType();
-
   private final String dagCheckpointDir;
-  private final Gson gson;
+  private final GsonSerDe<List<JobExecutionPlan>> serDe;
 
   public FSDagStateStore(Config config, Map<URI, TopologySpec> 
topologySpecMap) throws IOException {
     this.dagCheckpointDir = config.getString(DagManager.DAG_STATESTORE_DIR);
@@ -69,8 +65,13 @@ public class FSDagStateStore implements DagStateStore {
 
     JsonSerializer<List<JobExecutionPlan>> serializer = new 
JobExecutionPlanListSerializer();
     JsonDeserializer<List<JobExecutionPlan>> deserializer = new 
JobExecutionPlanListDeserializer(topologySpecMap);
-    this.gson = new 
GsonBuilder().registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, serializer)
-        .registerTypeAdapter(LIST_JOBEXECUTIONPLAN_TYPE, 
deserializer).create();
+
+    /** {@link Type} object will need to strictly match with the generic 
arguments being used
+     * to define {@link GsonSerDe}
+     * Due to type erasure, the {@link Type} needs to initialized here instead 
of inside {@link GsonSerDe}.
+     * */
+    Type typeToken = new TypeToken<List<JobExecutionPlan>>(){}.getType();
+    this.serDe = new GsonSerDe<>(typeToken, serializer, deserializer);
   }
 
   /**
@@ -136,7 +137,7 @@ public class FSDagStateStore implements DagStateStore {
    */
   private String serializeDag(Dag<JobExecutionPlan> dag) {
     List<JobExecutionPlan> jobExecutionPlanList = 
dag.getNodes().stream().map(Dag.DagNode::getValue).collect(Collectors.toList());
-    return gson.toJson(jobExecutionPlanList, LIST_JOBEXECUTIONPLAN_TYPE);
+    return serDe.serialize(jobExecutionPlanList);
   }
 
   /**
@@ -145,6 +146,6 @@ public class FSDagStateStore implements DagStateStore {
    * @return a {@link Dag} parametrized by {@link JobExecutionPlan}.
    */
   private Dag<JobExecutionPlan> deserializeDag(String jsonDag) {
-    return new JobExecutionPlanDagFactory().createDag(gson.fromJson(jsonDag, 
LIST_JOBEXECUTIONPLAN_TYPE));
+    return new 
JobExecutionPlanDagFactory().createDag(serDe.deserialize(jsonDag));
   }
 }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
new file mode 100644
index 0000000..be14e41
--- /dev/null
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/spec/GsonSerDe.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.service.modules.spec;
+
+import java.lang.reflect.Type;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonSerializer;
+
+
+/**
+ * SerDe library used in GaaS for {@link 
org.apache.gobblin.runtime.api.SpecStore} and
+ * {@link org.apache.gobblin.service.modules.orchestration.DagStateStore}.
+ *
+ * The solution is built on top of {@link Gson} library.
+ * @param <T> The type of object to be serialized.
+ */
+public class GsonSerDe<T> {
+  private final Gson gson;
+  private final JsonSerializer<T> serializer;
+  private final JsonDeserializer<T> deserializer;
+  private final Type type;
+
+  public GsonSerDe(Type type, JsonSerializer<T> serializer, 
JsonDeserializer<T> deserializer) {
+    this.serializer = serializer;
+    this.deserializer = deserializer;
+    this.type = type;
+
+    this.gson = new GsonBuilder().registerTypeAdapter(type, serializer)
+        .registerTypeAdapter(type, deserializer)
+        .create();
+  }
+
+  public String serialize(T object) {
+    return gson.toJson(object, type);
+  }
+
+  public T deserialize(String serializedObject) {
+    return gson.fromJson(serializedObject, type);
+  }
+}
\ No newline at end of file

Reply via email to