This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new 72e4fd70c [Feature][st-engine] add server in NodeExtension (#2195)
72e4fd70c is described below
commit 72e4fd70cf9b28d4ec06a8d6268b3a5a8abe0590
Author: ic4y <[email protected]>
AuthorDate: Tue Jul 19 19:18:18 2022 +0800
[Feature][st-engine] add server in NodeExtension (#2195)
* add server in NodeExtension
* Add return value
* Add license
* hazelcast upgraded to 5.1
---
pom.xml | 8 ++
seatunnel-dist/release-docs/LICENSE | 1 +
seatunnel-dist/release-docs/NOTICE | 52 +++++++++++
seatunnel-dist/src/main/assembly/assembly-bin.xml | 10 ++
seatunnel-engine/pom.xml | 46 ++++++++++
.../seatunnel/engine/server/NodeContext.java | 30 ++++++
.../seatunnel/engine/server/NodeExtension.java | 67 ++++++++++++++
.../engine/server/NodeExtensionCommon.java | 101 +++++++++++++++++++++
.../org/apache/seatunnel/engine/server/Server.java | 81 +++++++++++++++++
.../seatunnel/engine/server/ServerStarter.java | 29 ++++++
.../com.hazelcast.instance.impl.NodeExtension | 15 +++
tools/dependencies/known-dependencies.txt | 3 +-
12 files changed, 442 insertions(+), 1 deletion(-)
diff --git a/pom.xml b/pom.xml
index bc3300d55..d0e5e1eb3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
<module>seatunnel-formats</module>
<module>seatunnel-dist</module>
<module>seatunnel-server</module>
+ <module>seatunnel-engine</module>
</modules>
<profiles>
@@ -220,6 +221,7 @@
<springfox-swagger.version>2.6.1</springfox-swagger.version>
<swagger-annotations.version>1.5.10</swagger-annotations.version>
<hibernate.validator.version>6.2.2.Final</hibernate.validator.version>
+ <hazelcast.version>5.1</hazelcast.version>
</properties>
<dependencyManagement>
@@ -934,6 +936,12 @@
<version>${hibernate.validator.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ <version>${hazelcast.version}</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>
diff --git a/seatunnel-dist/release-docs/LICENSE
b/seatunnel-dist/release-docs/LICENSE
index 8191c55a0..20f6de19e 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -938,6 +938,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) springfox-swagger-common
(io.springfox:springfox-swagger-common:2.6.1 -
https://github.com/springfox/springfox)
(The Apache Software License, Version 2.0) springfox-swagger-ui
(io.springfox:springfox-swagger-ui:2.6.1 -
https://github.com/springfox/springfox)
(The Apache Software License, Version 2.0) springfox-swagger2
(io.springfox:springfox-swagger2:2.6.1 - https://github.com/springfox/springfox)
+ (The Apache Software License, Version 2.0) hazelcast
(com.hazelcast:hazelcast:5.1 - https://github.com/hazelcast/hazelcast)
========================================================================
diff --git a/seatunnel-dist/release-docs/NOTICE
b/seatunnel-dist/release-docs/NOTICE
index 2489bae1e..0f4b9da9d 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -4445,4 +4445,56 @@ Copyright 2000-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+=========================================================================
+
+hazelcast NOTICE
+
+=========================================================================
+The packages:
+
+com.hazelcast.internal.util.collection
+com.hazelcast.internal.util.concurrent
+
+and the classes:
+
+com.hazelcast.internal.util.QuickMath
+com.hazelcast.client.impl.protocol.util.UnsafeBuffer
+com.hazelcast.client.impl.protocol.util.BufferBuilder
+
+contain code originating from the Agrona project
+(https://github.com/real-logic/Agrona).
+
+The class com.hazelcast.internal.util.HashUtil contains code originating
+from the Koloboke project (https://github.com/OpenHFT/Koloboke).
+
+The class classloading.ThreadLocalLeakTestUtils contains code originating
+from the Tomcat project (https://github.com/apache/tomcat).
+
+com.hazelcast.internal.cluster.fd.PhiAccrualFailureDetector contains code
originating
+from the Akka project (https://github.com/akka/akka/).
+
+The package com.hazelcast.internal.json contains code originating
+from minimal-json project (https://github.com/ralfstx/minimal-json).
+
+The class com.hazelcast.instance.impl.MobyNames contains code originating
+from The Moby Project (https://github.com/moby/moby).
+
+The class com.hazelcast.internal.util.graph.BronKerboschCliqueFinder contains
code
+originating from The JGraphT Project (https://github.com/jgrapht/jgrapht).
+
+The packages:
+com.hazelcast.sql
+com.hazelcast.jet.sql
+
+contain code originating from the Apache Calcite
(https://github.com/apache/calcite)
+
+The class com.hazelcast.jet.kafka.impl.ResumeTransactionUtil contains
+code derived from the Apache Flink project.
+
+The class com.hazelcast.internal.util.ConcurrentReferenceHashMap contains code
written by Doug Lea
+and updated within the WildFly project (https://github.com/wildfly/wildfly).
+
+The class org.apache.calcite.linq4j.tree.ConstantExpression contains code
+originating from the Calcite project (https://github.com/apache/calcite).
+
=========================================================================
\ No newline at end of file
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml
b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index d68a147da..bd2841102 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -118,6 +118,16 @@
</excludes>
<outputDirectory>/lib</outputDirectory>
</fileSet>
+ <fileSet>
+ <directory>../seatunnel-engine/target</directory>
+ <includes>
+ <include>seatunnel-engine*.jar</include>
+ </includes>
+ <excludes>
+ <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+ </excludes>
+ <outputDirectory>/lib</outputDirectory>
+ </fileSet>
<!-- connectors -->
<fileSet>
<directory>../seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib</directory>
diff --git a/seatunnel-engine/pom.xml b/seatunnel-engine/pom.xml
new file mode 100644
index 000000000..f3064079c
--- /dev/null
+++ b/seatunnel-engine/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-engine</artifactId>
+ <packaging>pom</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.hazelcast</groupId>
+ <artifactId>hazelcast</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
new file mode 100644
index 000000000..05822d331
--- /dev/null
+++
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeContext.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import com.hazelcast.instance.impl.DefaultNodeContext;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.instance.impl.NodeExtension;
+
+public class NodeContext extends DefaultNodeContext {
+
+ @Override
+ public NodeExtension createNodeExtension(Node node) {
+ return new org.apache.seatunnel.engine.server.NodeExtension(node);
+ }
+}
diff --git
a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
new file mode 100644
index 000000000..f583a91ef
--- /dev/null
+++
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtension.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import com.hazelcast.cluster.ClusterState;
+import com.hazelcast.instance.impl.DefaultNodeExtension;
+import com.hazelcast.instance.impl.Node;
+
+import java.util.Map;
+
+public class NodeExtension extends DefaultNodeExtension {
+ private final NodeExtensionCommon extCommon;
+
+ public NodeExtension(Node node) {
+ super(node);
+ extCommon = new NodeExtensionCommon(node, new Server(node));
+ }
+
+ @Override
+ public void beforeStart() {
+ // TODO Get Config from Node here
+ super.beforeStart();
+ }
+
+ @Override
+ public void afterStart() {
+ super.afterStart();
+ extCommon.afterStart();
+ }
+
+ @Override
+ public void beforeClusterStateChange(ClusterState currState, ClusterState
requestedState, boolean isTransient) {
+ super.beforeClusterStateChange(currState, requestedState, isTransient);
+ extCommon.beforeClusterStateChange(requestedState);
+ }
+
+ @Override
+ public void onClusterStateChange(ClusterState newState, boolean
isTransient) {
+ super.onClusterStateChange(newState, isTransient);
+ extCommon.onClusterStateChange(newState);
+ }
+
+ @Override
+ public Map<String, Object> createExtensionServices() {
+ return extCommon.createExtensionServices();
+ }
+
+ @Override
+ public void printNodeInfo() {
+ extCommon.printNodeInfo(systemLogger, "");
+ }
+}
diff --git
a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
new file mode 100644
index 000000000..8c6ab6dae
--- /dev/null
+++
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/NodeExtensionCommon.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import static com.hazelcast.cluster.ClusterState.PASSIVE;
+
+import com.hazelcast.cluster.ClusterState;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+class NodeExtensionCommon {
+ private static final String ST_LOGO =
+ " \n" +
+ " _____ _____ _ \n" +
+ "/ ___| |_ _| | |\n" +
+ "\\ `--. ___ __ _ | | _ _ _ __ _ __ ___ | |\n" +
+ " `--. \\ / _ \\ / _` | | | | | | || '_ \\ | '_ \\ / _ \\| |\n"
+
+ "/\\__/ /| __/| (_| | | | | |_| || | | || | | || __/| |\n" +
+ "\\____/ \\___| \\__,_| \\_/ \\__,_||_| |_||_| |_|
\\___||_|\n" +
+ " \n";
+ private static final String COPYRIGHT_LINE = "Copyright © 2021-2022 The
Apache Software Foundation. Apache SeaTunnel, SeaTunnel, and its feather logo
are trademarks of The Apache Software Foundation.";
+
+ private final Node node;
+ private final ILogger logger;
+ private final Server server;
+
+ NodeExtensionCommon(Node node, Server server) {
+ this.node = node;
+ this.logger = node.getLogger(getClass().getName());
+ this.server = server;
+ }
+
+ void afterStart() {
+ //TODO seaTunnelServer after start in here
+ }
+
+ void beforeClusterStateChange(ClusterState requestedState) {
+ if (requestedState != PASSIVE) {
+ return;
+ }
+ logger.info("st is preparing to enter the PASSIVE cluster state");
+ NodeEngineImpl ne = node.nodeEngine;
+ //TODO This is where cluster state changes are handled
+ }
+
+ void onClusterStateChange(ClusterState ignored) {
+ //TODO This is where cluster state changes are handled
+ }
+
+ void printNodeInfo(ILogger log, String addToProductName) {
+ log.info(imdgVersionMessage());
+ log.info(clusterNameMessage());
+ log.fine(serializationVersionMessage());
+ log.info('\n' + ST_LOGO);
+ log.info(COPYRIGHT_LINE);
+ }
+
+ private String imdgVersionMessage() {
+ String build = node.getBuildInfo().getBuild();
+ String revision = node.getBuildInfo().getRevision();
+ if (!revision.isEmpty()) {
+ build += " - " + revision;
+ }
+ return "Based on Hazelcast IMDG version: " + node.getVersion() + " ("
+ build + ")";
+ }
+
+ private String serializationVersionMessage() {
+ return "Configured Hazelcast Serialization version: " +
node.getBuildInfo().getSerializationVersion();
+ }
+
+ private String clusterNameMessage() {
+ return "Cluster name: " + node.getConfig().getClusterName();
+ }
+
+ Map<String, Object> createExtensionServices() {
+ Map<String, Object> extensionServices = new HashMap<>();
+
+ extensionServices.put(Server.SERVICE_NAME, server);
+
+ return extensionServices;
+ }
+}
diff --git
a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
new file mode 100644
index 000000000..90bbfc7c8
--- /dev/null
+++
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/Server.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.services.ManagedService;
+import com.hazelcast.internal.services.MembershipAwareService;
+import com.hazelcast.internal.services.MembershipServiceEvent;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.impl.NodeEngine;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.LiveOperations;
+import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
+
+import java.util.Properties;
+
+public class Server implements ManagedService, MembershipAwareService,
LiveOperationsTracker {
+ public static final String SERVICE_NAME = "st:impl:seaTunnelServer";
+
+ private NodeEngineImpl nodeEngine;
+ private final ILogger logger;
+
+ public Server(Node node) {
+ this.logger = node.getLogger(getClass());
+ logger.info("SeaTunnel server start...");
+ }
+
+ @Override
+ public void init(NodeEngine engine, Properties hzProperties) {
+ this.nodeEngine = (NodeEngineImpl) engine;
+ }
+
+ @Override
+ public void reset() {
+
+ }
+
+ @Override
+ public void shutdown(boolean terminate) {
+
+ }
+
+ @Override
+ public void memberAdded(MembershipServiceEvent event) {
+
+ }
+
+ @Override
+ public void memberRemoved(MembershipServiceEvent event) {
+
+ }
+
+ @Override
+ public void populate(LiveOperations liveOperations) {
+
+ }
+
+ /**
+ * Used for debugging on call
+ */
+ public String printMessage(String message){
+ this.logger.info(nodeEngine.getThisAddress() + ":" + message);
+ return message;
+ }
+
+}
diff --git
a/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
new file mode 100644
index 000000000..5749f4c3a
--- /dev/null
+++
b/seatunnel-engine/src/main/java/org/apache/seatunnel/engine/server/ServerStarter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+
+public class ServerStarter {
+
+ public static void main(String[] args) {
+ Config config = new Config();
+ HazelcastInstanceFactory.newHazelcastInstance(config,
Thread.currentThread().getName(), new NodeContext());
+ }
+}
diff --git
a/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
b/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
new file mode 100644
index 000000000..972b00a88
--- /dev/null
+++
b/seatunnel-engine/src/main/resources/META-INF/services/com.hazelcast.instance.impl.NodeExtension
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.seatunnel.engine.server.NodeExtension
diff --git a/tools/dependencies/known-dependencies.txt
b/tools/dependencies/known-dependencies.txt
index a0a43ed80..dd5dd4c69 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -731,4 +731,5 @@ zookeeper-jute-3.5.9.jar
zstd-jni-1.3.3-1.jar
zstd-jni-1.4.3-1.jar
jakarta.activation-api-1.2.1.jar
-jakarta.xml.bind-api-2.3.2.jar
\ No newline at end of file
+jakarta.xml.bind-api-2.3.2.jar
+hazelcast-5.1.jar
\ No newline at end of file