This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dba236  SUBMARINE-373. Upgrade cluster module dependent library
7dba236 is described below

commit 7dba2360d8c3e9a005f1b31666801c5bf9b8d34b
Author: Xun Liu <[email protected]>
AuthorDate: Tue Feb 4 23:17:07 2020 +0800

    SUBMARINE-373. Upgrade cluster module dependent library
    
    ### What is this PR for?
    The atomix raft library used by the submarine cluster module is version 3.0,
    Need to know the list of all servers ip in the cluster to create a cluster.
    
    In the k8s environment, since the IP of the POD is not fixed, we need to 
upgrade to the atomix-3.1.0 + version and create a cluster through broadcast 
discovery.
    
    ### What type of PR is it?
    [Refactoring]
    
    ### Todos
    * [ ] - Create a cluster through broadcast discovery.
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/SUBMARINE-373
    
    ### How should this be tested?
    * https://travis-ci.org/liuxunorg/submarine/builds/645828692
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Xun Liu <[email protected]>
    
    Closes #172 from liuxunorg/SUBMARINE-373 and squashes the following commits:
    
    55addda [Xun Liu] relocation
    43c3667 [Xun Liu] fixed exclude netty problem
    197c86f [Xun Liu] SUBMARINE-373. Upgrade cluster module dependent library 
version
---
 pom.xml                                            |  5 ++-
 .../submarine/commons/cluster/ClusterManager.java  | 51 ++++++++++++++++------
 .../submarine/commons/cluster/ClusterServer.java   | 27 ++++++++----
 .../commons/cluster/UnicastServiceAdapter.java     | 42 ++++++++++++++++++
 submarine-commons/commons-metastore/pom.xml        |  8 ++++
 submarine-dist/src/assembly/distribution.xml       | 13 +++++-
 6 files changed, 122 insertions(+), 24 deletions(-)

diff --git a/pom.xml b/pom.xml
index 99f1b60..4059689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@
     <derby.version>10.15.1.3</derby.version>
     <zeppelin.version>0.9.0-SNAPSHOT</zeppelin.version>
     <jgit.version>5.5.1.201910021850-r</jgit.version>
-    <atomix.version>3.0.0-rc4</atomix.version>
+    <atomix.version>3.1.5</atomix.version>
     <spark.scala.version>2.11.8</spark.scala.version>
     <spark.scala.binary.version>2.11</spark.scala.binary.version>
     <hive.version>2.3.6</hive.version>
@@ -138,7 +138,8 @@
   <modules>
     <module>submarine-commons</module>
     <module>submarine-client</module>
-    <module>submarine-cloud</module>
+    <!-- There is a problem with the submarine-cloud pom, the compilation is 
very slow, and it is temporarily closed
+    <module>submarine-cloud</module-->
     <module>submodules/tony</module>
     <module>submarine-server</module>
     <module>submarine-all</module>
diff --git 
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
 
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
index 508bd57..1bfc2bf 100644
--- 
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
+++ 
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -19,6 +19,7 @@ package org.apache.submarine.commons.cluster;
 import com.google.common.collect.Maps;
 import io.atomix.cluster.MemberId;
 import io.atomix.cluster.Node;
+import io.atomix.cluster.messaging.MessagingConfig;
 import io.atomix.cluster.messaging.MessagingService;
 import io.atomix.cluster.messaging.impl.NettyMessagingService;
 import io.atomix.primitive.operation.OperationType;
@@ -149,13 +150,15 @@ public abstract class ClusterManager {
 
   protected ClusterMonitor clusterMonitor = null;
 
+  protected String MESSAGING_SERVICE_NAME = "SubmarineCluster";
+
   protected boolean isTest = false;
 
   protected ClusterManager() {
     try {
       this.serverHost = NetworkUtils.findAvailableHostAddress();
       String clusterAddr = sconf.getClusterAddress();
-      LOG.info(this.getClass().toString() + "::clusterAddr = {}", clusterAddr);
+      LOG.info("clusterAddr = {}", clusterAddr);
       if (!StringUtils.isEmpty(clusterAddr)) {
         String cluster[] = clusterAddr.split(",");
 
@@ -204,13 +207,13 @@ public abstract class ClusterManager {
       return;
     }
 
-    LOG.info(this.getClass().toString() + "::ClusterManager::start()");
+    LOG.info("ClusterManager::start()");
 
     // RaftClient Thread
     new Thread(new Runnable() {
       @Override
       public void run() {
-        LOG.info(this.getClass().toString() + "::RaftClientThread run() >>>");
+        LOG.info("RaftClientThread run() >>>");
 
         int raftClientPort = 0;
         try {
@@ -219,12 +222,13 @@ public abstract class ClusterManager {
           LOG.error(e.getMessage(), e);
         }
 
+        LOG.info("RaftClientThread {}:{}", serverHost, raftClientPort);
         MemberId memberId = MemberId.from(serverHost + ":" + raftClientPort);
         Address address = Address.from(serverHost, raftClientPort);
         raftAddressMap.put(memberId, address);
 
-        MessagingService messagingManager
-            = 
NettyMessagingService.builder().withAddress(address).build().start().join();
+        MessagingService messagingManager = new NettyMessagingService(
+            MESSAGING_SERVICE_NAME, address, new 
MessagingConfig()).start().join();
         RaftClientProtocol protocol = new RaftClientMessagingProtocol(
             messagingManager, protocolSerializer, raftAddressMap::get);
 
@@ -238,7 +242,7 @@ public abstract class ClusterManager {
 
         raftSessionClient = createProxy(raftClient);
 
-        LOG.info(this.getClass().toString() + "::RaftClientThread run() <<<");
+        LOG.info("RaftClientThread run() <<<");
       }
     }).start();
 
@@ -255,8 +259,7 @@ public abstract class ClusterManager {
               while (!raftInitialized()) {
                 retry++;
                 if (0 == retry % 30) {
-                  LOG.warn(this.getClass().toString()
-                      + "::Raft incomplete initialization! retry[{}]", retry);
+                  LOG.warn("Raft incomplete initialization! retry[{}]", retry);
                 }
                 Thread.sleep(100);
               }
@@ -272,11 +275,9 @@ public abstract class ClusterManager {
               if (true == success) {
                 // The operation was successfully deleted
                 clusterMetaQueue.remove(metaEntity);
-                LOG.info(this.getClass().toString()
-                    + "::Cluster Meta Consume success! {}", metaEntity);
+                LOG.info("Cluster Meta Consume success! {}", metaEntity);
               } else {
-                LOG.error(this.getClass().toString()
-                    + "::Cluster Meta Consume faild!");
+                LOG.error("Cluster Meta Consume faild!");
               }
             } else {
               Thread.sleep(100);
@@ -424,12 +425,36 @@ public abstract class ClusterManager {
     }
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug(this.getClass().toString() + "::getClusterMeta >>> {}", 
clusterMeta.toString());
+      LOG.debug("getClusterMeta >>> {}", clusterMeta.toString());
     }
 
     return clusterMeta;
   }
 
+  protected static final Namespace storageNamespace = Namespace.builder()
+      .register(CloseSessionEntry.class)
+      .register(CommandEntry.class)
+      .register(ConfigurationEntry.class)
+      .register(InitializeEntry.class)
+      .register(KeepAliveEntry.class)
+      .register(MetadataEntry.class)
+      .register(OpenSessionEntry.class)
+      .register(QueryEntry.class)
+      .register(PrimitiveOperation.class)
+      .register(DefaultOperationId.class)
+      .register(OperationType.class)
+      .register(ReadConsistency.class)
+      .register(ArrayList.class)
+      .register(HashSet.class)
+      .register(DefaultRaftMember.class)
+      .register(MemberId.class)
+      .register(RaftMember.Type.class)
+      .register(Instant.class)
+      .register(Configuration.class)
+      .register(byte[].class)
+      .register(long[].class)
+      .build();
+
   protected static final Serializer protocolSerializer = 
Serializer.using(Namespace.builder()
       .register(OpenSessionRequest.class)
       .register(OpenSessionResponse.class)
diff --git 
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
 
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
index 47300dc..d622f0c 100644
--- 
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
+++ 
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
@@ -21,19 +21,23 @@ import io.atomix.cluster.BootstrapService;
 import io.atomix.cluster.ManagedClusterMembershipService;
 import io.atomix.cluster.Member;
 import io.atomix.cluster.MemberId;
-import io.atomix.cluster.MembershipConfig;
 import io.atomix.cluster.Node;
 import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
 import io.atomix.cluster.impl.DefaultClusterMembershipService;
 import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
 import io.atomix.cluster.messaging.BroadcastService;
+import io.atomix.cluster.messaging.MessagingConfig;
 import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.UnicastService;
 import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.cluster.protocol.HeartbeatMembershipProtocol;
+import io.atomix.cluster.protocol.HeartbeatMembershipProtocolConfig;
 import io.atomix.primitive.PrimitiveState;
 import io.atomix.protocols.raft.RaftServer;
 import io.atomix.protocols.raft.protocol.RaftServerProtocol;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
+import io.atomix.utils.Version;
 import io.atomix.utils.net.Address;
 import org.apache.commons.lang.StringUtils;
 import org.apache.submarine.commons.cluster.meta.ClusterMeta;
@@ -151,14 +155,15 @@ public class ClusterServer extends ClusterManager {
     new Thread(new Runnable() {
       @Override
       public void run() {
-        LOG.info("RaftServer run() >>>");
+        LOG.info("RaftServer run({}:{}) >>>", serverHost, raftServerPort);
 
         Address address = Address.from(serverHost, raftServerPort);
         Member member = Member.builder(MemberId.from(serverHost + ":" + 
raftServerPort))
             .withAddress(address)
             .build();
-        messagingService = NettyMessagingService.builder()
-            .withAddress(address).build().start().join();
+        messagingService = new NettyMessagingService(
+            MESSAGING_SERVICE_NAME, member.address(), new 
MessagingConfig()).start().join();
+
         RaftServerProtocol protocol = new RaftServerMessagingProtocol(
             messagingService, ClusterManager.protocolSerializer, 
raftAddressMap::get);
 
@@ -169,6 +174,11 @@ public class ClusterServer extends ClusterManager {
           }
 
           @Override
+          public UnicastService getUnicastService() {
+            return new UnicastServiceAdapter();
+          }
+
+          @Override
           public BroadcastService getBroadcastService() {
             return new BroadcastServiceAdapter();
           }
@@ -176,10 +186,11 @@ public class ClusterServer extends ClusterManager {
 
         ManagedClusterMembershipService clusterService = new 
DefaultClusterMembershipService(
             member,
-            new DefaultNodeDiscoveryService(bootstrapService, member,
-                new BootstrapDiscoveryProvider(clusterNodes)),
+            Version.from("1.0.0"),
+            new DefaultNodeDiscoveryService(
+                bootstrapService, member, new 
BootstrapDiscoveryProvider(clusterNodes)),
             bootstrapService,
-            new MembershipConfig());
+            new HeartbeatMembershipProtocol(new 
HeartbeatMembershipProtocolConfig()));
 
         File atomixDateDir = com.google.common.io.Files.createTempDir();
         atomixDateDir.deleteOnExit();
@@ -190,7 +201,7 @@ public class ClusterServer extends ClusterManager {
             .withStorage(RaftStorage.builder()
                 .withStorageLevel(StorageLevel.MEMORY)
                 .withDirectory(atomixDateDir)
-                .withSerializer(storageSerializer)
+                .withNamespace(storageNamespace)
                 .withMaxSegmentSize(1024 * 1024)
                 .build());
 
diff --git 
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/UnicastServiceAdapter.java
 
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/UnicastServiceAdapter.java
new file mode 100644
index 0000000..ec3db26
--- /dev/null
+++ 
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/UnicastServiceAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import io.atomix.cluster.messaging.UnicastService;
+import io.atomix.utils.net.Address;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+public class UnicastServiceAdapter implements UnicastService {
+  @Override
+  public void unicast(Address address, String subject, byte[] message) {
+
+  }
+
+  @Override
+  public void addListener(String subject, BiConsumer<Address, byte[]> 
listener, Executor executor) {
+
+  }
+
+  @Override
+  public void removeListener(String subject, BiConsumer<Address, byte[]> 
listener) {
+
+  }
+}
diff --git a/submarine-commons/commons-metastore/pom.xml 
b/submarine-commons/commons-metastore/pom.xml
index dd7b685..7f29111 100644
--- a/submarine-commons/commons-metastore/pom.xml
+++ b/submarine-commons/commons-metastore/pom.xml
@@ -330,6 +330,14 @@
                   <pattern>com.google</pattern>
                   
<shadedPattern>${shaded.dependency.prefix}.com.google</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.eclipse.jetty</pattern>
+                  
<shadedPattern>${shaded.dependency.prefix}.org.eclipse.jetty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>javax.ws.rs.core</pattern>
+                  
<shadedPattern>${shaded.dependency.prefix}.javax.ws.rs.core</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/submarine-dist/src/assembly/distribution.xml 
b/submarine-dist/src/assembly/distribution.xml
index bbb0c2e..ed93d05 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -91,7 +91,14 @@
       <directory>../submarine-commons/commons-cluster/target</directory>
       <outputDirectory>/lib</outputDirectory>
       <includes>
-        <include>submarine-commons-cluster-${project.version}.jar</include>
+        
<include>submarine-commons-cluster-${project.version}-shade.jar</include>
+      </includes>
+    </fileSet>
+    <fileSet>
+      <directory>../submarine-commons/commons-metastore/target</directory>
+      <outputDirectory>/lib</outputDirectory>
+      <includes>
+        
<include>submarine-commons-metastore-${project.version}-shade.jar</include>
       </includes>
     </fileSet>
     <fileSet>
@@ -152,10 +159,14 @@
         <exclude>commons-runtime-${project.version}.jar</exclude>
         <exclude>commons-cluster-${project.version}.jar</exclude>
         <exclude>commons-rpc-${project.version}.jar</exclude>
+        <exclude>commons-metastore-${project.version}.jar</exclude>
         <exclude>grpc-*.jar</exclude>
         <exclude>protobuf-java*.jar</exclude>
         <!-- mysql-connector-java uses the GPL license. So we need exclude 
mysql-connector-java jar -->
         <exclude>mysql-connector-java-*.jar</exclude>
+        <!-- atomix & netty-4.1.27.Final already shade in 
submarine-commons-cluster-${project.version}-shade.jar -->
+        <exclude>atomix-*.jar</exclude>
+        <exclude>netty-*-4.1.27.Final*.jar</exclude>
       </excludes>
     </fileSet>
     <fileSet>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to