This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 7dba236 SUBMARINE-373. Upgrade cluster module dependent library
7dba236 is described below
commit 7dba2360d8c3e9a005f1b31666801c5bf9b8d34b
Author: Xun Liu <[email protected]>
AuthorDate: Tue Feb 4 23:17:07 2020 +0800
SUBMARINE-373. Upgrade cluster module dependent library
### What is this PR for?
The atomix raft library used by the submarine cluster module is version 3.0,
Need to know the list of all servers ip in the cluster to create a cluster.
In the k8s environment, since the IP of the POD is not fixed, we need to
upgrade to the atomix-3.1.0 + version and create a cluster through broadcast
discovery.
### What type of PR is it?
[Refactoring]
### Todos
* [ ] - Create a cluster through broadcast discovery.
### What is the Jira issue?
* https://issues.apache.org/jira/browse/SUBMARINE-373
### How should this be tested?
* https://travis-ci.org/liuxunorg/submarine/builds/645828692
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Xun Liu <[email protected]>
Closes #172 from liuxunorg/SUBMARINE-373 and squashes the following commits:
55addda [Xun Liu] relocation
43c3667 [Xun Liu] fixed exclude netty problem
197c86f [Xun Liu] SUBMARINE-373. Upgrade cluster module dependent library
version
---
pom.xml | 5 ++-
.../submarine/commons/cluster/ClusterManager.java | 51 ++++++++++++++++------
.../submarine/commons/cluster/ClusterServer.java | 27 ++++++++----
.../commons/cluster/UnicastServiceAdapter.java | 42 ++++++++++++++++++
submarine-commons/commons-metastore/pom.xml | 8 ++++
submarine-dist/src/assembly/distribution.xml | 13 +++++-
6 files changed, 122 insertions(+), 24 deletions(-)
diff --git a/pom.xml b/pom.xml
index 99f1b60..4059689 100644
--- a/pom.xml
+++ b/pom.xml
@@ -124,7 +124,7 @@
<derby.version>10.15.1.3</derby.version>
<zeppelin.version>0.9.0-SNAPSHOT</zeppelin.version>
<jgit.version>5.5.1.201910021850-r</jgit.version>
- <atomix.version>3.0.0-rc4</atomix.version>
+ <atomix.version>3.1.5</atomix.version>
<spark.scala.version>2.11.8</spark.scala.version>
<spark.scala.binary.version>2.11</spark.scala.binary.version>
<hive.version>2.3.6</hive.version>
@@ -138,7 +138,8 @@
<modules>
<module>submarine-commons</module>
<module>submarine-client</module>
- <module>submarine-cloud</module>
+ <!-- There is a problem with the submarine-cloud pom, the compilation is
very slow, and it is temporarily closed
+ <module>submarine-cloud</module-->
<module>submodules/tony</module>
<module>submarine-server</module>
<module>submarine-all</module>
diff --git
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
index 508bd57..1bfc2bf 100644
---
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
+++
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterManager.java
@@ -19,6 +19,7 @@ package org.apache.submarine.commons.cluster;
import com.google.common.collect.Maps;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.Node;
+import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.primitive.operation.OperationType;
@@ -149,13 +150,15 @@ public abstract class ClusterManager {
protected ClusterMonitor clusterMonitor = null;
+ protected String MESSAGING_SERVICE_NAME = "SubmarineCluster";
+
protected boolean isTest = false;
protected ClusterManager() {
try {
this.serverHost = NetworkUtils.findAvailableHostAddress();
String clusterAddr = sconf.getClusterAddress();
- LOG.info(this.getClass().toString() + "::clusterAddr = {}", clusterAddr);
+ LOG.info("clusterAddr = {}", clusterAddr);
if (!StringUtils.isEmpty(clusterAddr)) {
String cluster[] = clusterAddr.split(",");
@@ -204,13 +207,13 @@ public abstract class ClusterManager {
return;
}
- LOG.info(this.getClass().toString() + "::ClusterManager::start()");
+ LOG.info("ClusterManager::start()");
// RaftClient Thread
new Thread(new Runnable() {
@Override
public void run() {
- LOG.info(this.getClass().toString() + "::RaftClientThread run() >>>");
+ LOG.info("RaftClientThread run() >>>");
int raftClientPort = 0;
try {
@@ -219,12 +222,13 @@ public abstract class ClusterManager {
LOG.error(e.getMessage(), e);
}
+ LOG.info("RaftClientThread {}:{}", serverHost, raftClientPort);
MemberId memberId = MemberId.from(serverHost + ":" + raftClientPort);
Address address = Address.from(serverHost, raftClientPort);
raftAddressMap.put(memberId, address);
- MessagingService messagingManager
- =
NettyMessagingService.builder().withAddress(address).build().start().join();
+ MessagingService messagingManager = new NettyMessagingService(
+ MESSAGING_SERVICE_NAME, address, new
MessagingConfig()).start().join();
RaftClientProtocol protocol = new RaftClientMessagingProtocol(
messagingManager, protocolSerializer, raftAddressMap::get);
@@ -238,7 +242,7 @@ public abstract class ClusterManager {
raftSessionClient = createProxy(raftClient);
- LOG.info(this.getClass().toString() + "::RaftClientThread run() <<<");
+ LOG.info("RaftClientThread run() <<<");
}
}).start();
@@ -255,8 +259,7 @@ public abstract class ClusterManager {
while (!raftInitialized()) {
retry++;
if (0 == retry % 30) {
- LOG.warn(this.getClass().toString()
- + "::Raft incomplete initialization! retry[{}]", retry);
+ LOG.warn("Raft incomplete initialization! retry[{}]", retry);
}
Thread.sleep(100);
}
@@ -272,11 +275,9 @@ public abstract class ClusterManager {
if (true == success) {
// The operation was successfully deleted
clusterMetaQueue.remove(metaEntity);
- LOG.info(this.getClass().toString()
- + "::Cluster Meta Consume success! {}", metaEntity);
+ LOG.info("Cluster Meta Consume success! {}", metaEntity);
} else {
- LOG.error(this.getClass().toString()
- + "::Cluster Meta Consume faild!");
+ LOG.error("Cluster Meta Consume faild!");
}
} else {
Thread.sleep(100);
@@ -424,12 +425,36 @@ public abstract class ClusterManager {
}
if (LOG.isDebugEnabled()) {
- LOG.debug(this.getClass().toString() + "::getClusterMeta >>> {}",
clusterMeta.toString());
+ LOG.debug("getClusterMeta >>> {}", clusterMeta.toString());
}
return clusterMeta;
}
+ protected static final Namespace storageNamespace = Namespace.builder()
+ .register(CloseSessionEntry.class)
+ .register(CommandEntry.class)
+ .register(ConfigurationEntry.class)
+ .register(InitializeEntry.class)
+ .register(KeepAliveEntry.class)
+ .register(MetadataEntry.class)
+ .register(OpenSessionEntry.class)
+ .register(QueryEntry.class)
+ .register(PrimitiveOperation.class)
+ .register(DefaultOperationId.class)
+ .register(OperationType.class)
+ .register(ReadConsistency.class)
+ .register(ArrayList.class)
+ .register(HashSet.class)
+ .register(DefaultRaftMember.class)
+ .register(MemberId.class)
+ .register(RaftMember.Type.class)
+ .register(Instant.class)
+ .register(Configuration.class)
+ .register(byte[].class)
+ .register(long[].class)
+ .build();
+
protected static final Serializer protocolSerializer =
Serializer.using(Namespace.builder()
.register(OpenSessionRequest.class)
.register(OpenSessionResponse.class)
diff --git
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
index 47300dc..d622f0c 100644
---
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
+++
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/ClusterServer.java
@@ -21,19 +21,23 @@ import io.atomix.cluster.BootstrapService;
import io.atomix.cluster.ManagedClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
-import io.atomix.cluster.MembershipConfig;
import io.atomix.cluster.Node;
import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
import io.atomix.cluster.impl.DefaultClusterMembershipService;
import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
import io.atomix.cluster.messaging.BroadcastService;
+import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.UnicastService;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.cluster.protocol.HeartbeatMembershipProtocol;
+import io.atomix.cluster.protocol.HeartbeatMembershipProtocolConfig;
import io.atomix.primitive.PrimitiveState;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.protocol.RaftServerProtocol;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
+import io.atomix.utils.Version;
import io.atomix.utils.net.Address;
import org.apache.commons.lang.StringUtils;
import org.apache.submarine.commons.cluster.meta.ClusterMeta;
@@ -151,14 +155,15 @@ public class ClusterServer extends ClusterManager {
new Thread(new Runnable() {
@Override
public void run() {
- LOG.info("RaftServer run() >>>");
+ LOG.info("RaftServer run({}:{}) >>>", serverHost, raftServerPort);
Address address = Address.from(serverHost, raftServerPort);
Member member = Member.builder(MemberId.from(serverHost + ":" +
raftServerPort))
.withAddress(address)
.build();
- messagingService = NettyMessagingService.builder()
- .withAddress(address).build().start().join();
+ messagingService = new NettyMessagingService(
+ MESSAGING_SERVICE_NAME, member.address(), new
MessagingConfig()).start().join();
+
RaftServerProtocol protocol = new RaftServerMessagingProtocol(
messagingService, ClusterManager.protocolSerializer,
raftAddressMap::get);
@@ -169,6 +174,11 @@ public class ClusterServer extends ClusterManager {
}
@Override
+ public UnicastService getUnicastService() {
+ return new UnicastServiceAdapter();
+ }
+
+ @Override
public BroadcastService getBroadcastService() {
return new BroadcastServiceAdapter();
}
@@ -176,10 +186,11 @@ public class ClusterServer extends ClusterManager {
ManagedClusterMembershipService clusterService = new
DefaultClusterMembershipService(
member,
- new DefaultNodeDiscoveryService(bootstrapService, member,
- new BootstrapDiscoveryProvider(clusterNodes)),
+ Version.from("1.0.0"),
+ new DefaultNodeDiscoveryService(
+ bootstrapService, member, new
BootstrapDiscoveryProvider(clusterNodes)),
bootstrapService,
- new MembershipConfig());
+ new HeartbeatMembershipProtocol(new
HeartbeatMembershipProtocolConfig()));
File atomixDateDir = com.google.common.io.Files.createTempDir();
atomixDateDir.deleteOnExit();
@@ -190,7 +201,7 @@ public class ClusterServer extends ClusterManager {
.withStorage(RaftStorage.builder()
.withStorageLevel(StorageLevel.MEMORY)
.withDirectory(atomixDateDir)
- .withSerializer(storageSerializer)
+ .withNamespace(storageNamespace)
.withMaxSegmentSize(1024 * 1024)
.build());
diff --git
a/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/UnicastServiceAdapter.java
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/UnicastServiceAdapter.java
new file mode 100644
index 0000000..ec3db26
--- /dev/null
+++
b/submarine-commons/commons-cluster/src/main/java/org/apache/submarine/commons/cluster/UnicastServiceAdapter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.submarine.commons.cluster;
+
+import io.atomix.cluster.messaging.UnicastService;
+import io.atomix.utils.net.Address;
+
+import java.util.concurrent.Executor;
+import java.util.function.BiConsumer;
+
+public class UnicastServiceAdapter implements UnicastService {
+ @Override
+ public void unicast(Address address, String subject, byte[] message) {
+
+ }
+
+ @Override
+ public void addListener(String subject, BiConsumer<Address, byte[]>
listener, Executor executor) {
+
+ }
+
+ @Override
+ public void removeListener(String subject, BiConsumer<Address, byte[]>
listener) {
+
+ }
+}
diff --git a/submarine-commons/commons-metastore/pom.xml
b/submarine-commons/commons-metastore/pom.xml
index dd7b685..7f29111 100644
--- a/submarine-commons/commons-metastore/pom.xml
+++ b/submarine-commons/commons-metastore/pom.xml
@@ -330,6 +330,14 @@
<pattern>com.google</pattern>
<shadedPattern>${shaded.dependency.prefix}.com.google</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.eclipse.jetty</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.org.eclipse.jetty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>javax.ws.rs.core</pattern>
+
<shadedPattern>${shaded.dependency.prefix}.javax.ws.rs.core</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
diff --git a/submarine-dist/src/assembly/distribution.xml
b/submarine-dist/src/assembly/distribution.xml
index bbb0c2e..ed93d05 100644
--- a/submarine-dist/src/assembly/distribution.xml
+++ b/submarine-dist/src/assembly/distribution.xml
@@ -91,7 +91,14 @@
<directory>../submarine-commons/commons-cluster/target</directory>
<outputDirectory>/lib</outputDirectory>
<includes>
- <include>submarine-commons-cluster-${project.version}.jar</include>
+
<include>submarine-commons-cluster-${project.version}-shade.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>../submarine-commons/commons-metastore/target</directory>
+ <outputDirectory>/lib</outputDirectory>
+ <includes>
+
<include>submarine-commons-metastore-${project.version}-shade.jar</include>
</includes>
</fileSet>
<fileSet>
@@ -152,10 +159,14 @@
<exclude>commons-runtime-${project.version}.jar</exclude>
<exclude>commons-cluster-${project.version}.jar</exclude>
<exclude>commons-rpc-${project.version}.jar</exclude>
+ <exclude>commons-metastore-${project.version}.jar</exclude>
<exclude>grpc-*.jar</exclude>
<exclude>protobuf-java*.jar</exclude>
<!-- mysql-connector-java uses the GPL license. So we need exclude
mysql-connector-java jar -->
<exclude>mysql-connector-java-*.jar</exclude>
+ <!-- atomix & netty-4.1.27.Final already shade in
submarine-commons-cluster-${project.version}-shade.jar -->
+ <exclude>atomix-*.jar</exclude>
+ <exclude>netty-*-4.1.27.Final*.jar</exclude>
</excludes>
</fileSet>
<fileSet>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]