This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new cf5dfeb Add seata implementation (#129)
cf5dfeb is described below
commit cf5dfebac1cb09f5f36dd97a8a3d5d5c2e435516
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Jul 21 17:19:16 2022 +0800
Add seata implementation (#129)
* Add seata implementation
* exact bom
* fix env
* update seata dependency
---
dubbo-extensions-dependencies-bom/pom.xml | 6 +
.../{ => dubbo-filter-seata}/pom.xml | 19 ++-
.../SeataTransactionPropagationConsumerFilter.java | 55 ++++++++
.../SeataTransactionPropagationProviderFilter.java | 92 +++++++++++++
.../META-INF/dubbo/org.apache.dubbo.rpc.Filter | 2 +
...taTransactionPropagationConsumerFilterTest.java | 67 ++++++++++
...taTransactionPropagationProviderFilterTest.java | 146 +++++++++++++++++++++
dubbo-filter-extensions/pom.xml | 4 +
8 files changed, 388 insertions(+), 3 deletions(-)
diff --git a/dubbo-extensions-dependencies-bom/pom.xml
b/dubbo-extensions-dependencies-bom/pom.xml
index f6babb8..41a1230 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -127,6 +127,7 @@
<consul_version>1.4.2</consul_version>
<consul_client_version>1.3.7</consul_client_version>
<test_container_version>1.15.2</test_container_version>
+ <seata_version>1.5.2</seata_version>
<maven_flatten_version>1.2.5</maven_flatten_version>
</properties>
@@ -421,6 +422,11 @@
<artifactId>testcontainers</artifactId>
<version>${test_container_version}</version>
</dependency>
+ <dependency>
+ <groupId>io.seata</groupId>
+ <artifactId>seata-core</artifactId>
+ <version>${seata_version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/dubbo-filter-extensions/pom.xml
b/dubbo-filter-extensions/dubbo-filter-seata/pom.xml
similarity index 72%
copy from dubbo-filter-extensions/pom.xml
copy to dubbo-filter-extensions/dubbo-filter-seata/pom.xml
index e91ca5c..0079b3d 100644
--- a/dubbo-filter-extensions/pom.xml
+++ b/dubbo-filter-extensions/dubbo-filter-seata/pom.xml
@@ -19,13 +19,26 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
+ <artifactId>dubbo-filter-extensions</artifactId>
<groupId>org.apache.dubbo.extensions</groupId>
- <artifactId>extensions-parent</artifactId>
<version>${revision}</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>dubbo-filter-extensions</artifactId>
- <version>${revision}</version>
+ <artifactId>dubbo-filter-seata</artifactId>
+ <name>${project.artifactId}</name>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.seata</groupId>
+ <artifactId>seata-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ </dependency>
+ </dependencies>
+
</project>
diff --git
a/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilter.java
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilter.java
new file mode 100644
index 0000000..4af0622
--- /dev/null
+++
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+
+import io.seata.core.constants.DubboConstants;
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+
+/**
+ * The type Transaction propagation consumer filter.
+ */
+@Activate(group = DubboConstants.CONSUMER)
+public class SeataTransactionPropagationConsumerFilter implements Filter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeataTransactionPropagationConsumerFilter.class);
+
+ @Override
+ public Result invoke(Invoker<?> invoker, Invocation invocation) throws
RpcException {
+ String xid = RootContext.getXID();
+ BranchType branchType = RootContext.getBranchType();
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(String.format("Client side xid in RootContext[%s]",
xid));
+ }
+ if (xid != null) {
+ invocation.setAttachment(RootContext.KEY_XID, xid);
+ if (branchType != null) {
+ invocation.setAttachment(RootContext.KEY_BRANCH_TYPE,
branchType.name());
+ }
+ }
+ return invoker.invoke(invocation);
+ }
+}
diff --git
a/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilter.java
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilter.java
new file mode 100644
index 0000000..e678b11
--- /dev/null
+++
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+
+import io.seata.common.util.StringUtils;
+import io.seata.core.constants.DubboConstants;
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+
+/**
+ * The type Transaction propagation provider filter.
+ */
+@Activate(group = DubboConstants.PROVIDER)
+public class SeataTransactionPropagationProviderFilter implements Filter {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeataTransactionPropagationProviderFilter.class);
+
+ @Override
+ public Result invoke(Invoker<?> invoker, Invocation invocation) throws
RpcException {
+ String rpcXid = invocation.getAttachment(RootContext.KEY_XID);
+ if (rpcXid == null) {
+ rpcXid =
invocation.getAttachment(RootContext.KEY_XID.toLowerCase());
+ }
+ String rpcBranchType =
invocation.getAttachment(RootContext.KEY_BRANCH_TYPE);
+ if (rpcBranchType == null) {
+ rpcBranchType =
invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase());
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Server side xid in RpcContext[" + rpcXid + "]");
+ }
+ boolean bind = false;
+
+ if (rpcXid != null) {
+ RootContext.bind(rpcXid);
+ if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
+ RootContext.bindBranchType(BranchType.TCC);
+ }
+ bind = true;
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(String.format("bind xid [%s] branchType [%s] to
RootContext", rpcXid, rpcBranchType));
+ }
+ }
+ try {
+ return invoker.invoke(invocation);
+ } finally {
+ if (bind) {
+ BranchType previousBranchType = RootContext.getBranchType();
+ String unbindXid = RootContext.unbind();
+ if (BranchType.TCC == previousBranchType) {
+ RootContext.unbindBranchType();
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(String.format("unbind xid [%s] branchType
[%s] from RootContext", unbindXid, previousBranchType));
+ }
+ if (!rpcXid.equalsIgnoreCase(unbindXid)) {
+ LOGGER.warn(String.format("xid in change during RPC from
%s to %s,branchType from %s to %s", rpcXid, unbindXid,
+ rpcBranchType != null ? rpcBranchType : "AT",
previousBranchType));
+ if (unbindXid != null) {
+ RootContext.bind(unbindXid);
+ LOGGER.warn(String.format("bind xid [%s] back to
RootContext", unbindXid));
+ if (BranchType.TCC == previousBranchType) {
+ RootContext.bindBranchType(BranchType.TCC);
+ LOGGER.warn(String.format("bind branchType [%s]
back to RootContext", previousBranchType));
+ }
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/dubbo-filter-extensions/dubbo-filter-seata/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
new file mode 100644
index 0000000..3caed90
--- /dev/null
+++
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
@@ -0,0 +1,2 @@
+seata-provider=org.apache.dubbo.seata.SeataTransactionPropagationProviderFilter
+seata-consumer=org.apache.dubbo.seata.SeataTransactionPropagationConsumerFilter
diff --git
a/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilterTest.java
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilterTest.java
new file mode 100644
index 0000000..423e094
--- /dev/null
+++
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class SeataTransactionPropagationConsumerFilterTest {
+ @Test
+ public void test() {
+ ApplicationModel applicationModel =
FrameworkModel.defaultModel().newApplication();
+ ModuleModel moduleModel = applicationModel.newModule();
+
+ Filter filter = moduleModel.getExtension(Filter.class,
"seata-consumer");
+ Assertions.assertTrue(filter instanceof
SeataTransactionPropagationConsumerFilter);
+
+
+ Invoker invoker = Mockito.mock(Invoker.class);
+ Invocation invocation = Mockito.mock(Invocation.class);
+ filter.invoke(invoker, invocation);
+
+ Mockito.verify(invoker, Mockito.times(1)).invoke(invocation);
+ Mockito.verify(invocation,
Mockito.times(0)).setAttachment(Mockito.any(), Mockito.any());
+
+ RootContext.bind("123456");
+ filter.invoke(invoker, invocation);
+ Mockito.verify(invoker, Mockito.times(2)).invoke(invocation);
+ Mockito.verify(invocation,
Mockito.times(1)).setAttachment(RootContext.KEY_XID, "123456");
+ Mockito.verify(invocation,
Mockito.times(2)).setAttachment(Mockito.any(), Mockito.any());
+
+ RootContext.bind("12345678");
+ RootContext.bindBranchType(BranchType.SAGA);
+ filter.invoke(invoker, invocation);
+ Mockito.verify(invoker, Mockito.times(3)).invoke(invocation);
+ Mockito.verify(invocation,
Mockito.times(1)).setAttachment(RootContext.KEY_XID, "12345678");
+ Mockito.verify(invocation,
Mockito.times(1)).setAttachment(RootContext.KEY_BRANCH_TYPE, "SAGA");
+ Mockito.verify(invocation,
Mockito.times(4)).setAttachment(Mockito.any(), Mockito.any());
+
+ RootContext.unbind();
+ RootContext.unbindBranchType();
+ moduleModel.destroy();
+ }
+}
diff --git
a/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilterTest.java
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilterTest.java
new file mode 100644
index 0000000..a5e0f76
--- /dev/null
+++
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+public class SeataTransactionPropagationProviderFilterTest {
+ private final AtomicReference<Function<Invocation, Result>> invokeFunction
= new AtomicReference<>();
+
+ private Invoker invoker = new Invoker() {
+ @Override
+ public Class getInterface() {
+ return null;
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ return invokeFunction.get().apply(invocation);
+ }
+
+ @Override
+ public URL getUrl() {
+ return null;
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return false;
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+ };
+
+
+ @Test
+ public void test() {
+ ApplicationModel applicationModel =
FrameworkModel.defaultModel().newApplication();
+ ModuleModel moduleModel = applicationModel.newModule();
+
+ Filter filter = moduleModel.getExtension(Filter.class,
"seata-provider");
+ Assertions.assertTrue(filter instanceof
SeataTransactionPropagationProviderFilter);
+
+ Invocation invocation = Mockito.mock(Invocation.class);
+ invokeFunction.set((inv) -> {
+ Assertions.assertNull(RootContext.getXID());
+ Assertions.assertNull(RootContext.getBranchType());
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+
+
Mockito.when(invocation.getAttachment(RootContext.KEY_XID)).thenReturn("1234");
+ invokeFunction.set((inv) -> {
+ Assertions.assertEquals("1234", RootContext.getXID());
+ Assertions.assertEquals(BranchType.AT,
RootContext.getBranchType());
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+
+
Mockito.when(invocation.getAttachment(RootContext.KEY_XID)).thenReturn(null);
+
Mockito.when(invocation.getAttachment(RootContext.KEY_XID.toLowerCase())).thenReturn("123456");
+ invokeFunction.set((inv) -> {
+ Assertions.assertEquals("123456", RootContext.getXID());
+ Assertions.assertEquals(BranchType.AT,
RootContext.getBranchType());
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+
+
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE)).thenReturn("TCC");
+ invokeFunction.set((inv) -> {
+ Assertions.assertEquals("123456", RootContext.getXID());
+ Assertions.assertEquals(BranchType.TCC,
RootContext.getBranchType());
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+
+
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE)).thenReturn(null);
+
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase())).thenReturn("TCC");
+ invokeFunction.set((inv) -> {
+ Assertions.assertEquals("123456", RootContext.getXID());
+ Assertions.assertEquals(BranchType.TCC,
RootContext.getBranchType());
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+ Assertions.assertNull(RootContext.getXID());
+ Assertions.assertNull(RootContext.getBranchType());
+
+
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase())).thenReturn("AT");
+ invokeFunction.set((inv) -> {
+ RootContext.bind("12345678");
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+ Assertions.assertEquals("12345678", RootContext.getXID());
+ Assertions.assertEquals(BranchType.AT, RootContext.getBranchType());
+ RootContext.unbind();
+ RootContext.unbindBranchType();
+
+
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase())).thenReturn("AT");
+ invokeFunction.set((inv) -> {
+ RootContext.bind("12345678");
+ RootContext.bindBranchType(BranchType.TCC);
+ return null;
+ });
+ filter.invoke(invoker, invocation);
+ Assertions.assertEquals("12345678", RootContext.getXID());
+ Assertions.assertEquals(BranchType.TCC, RootContext.getBranchType());
+ RootContext.unbind();
+ RootContext.unbindBranchType();
+
+ moduleModel.destroy();
+ }
+}
diff --git a/dubbo-filter-extensions/pom.xml b/dubbo-filter-extensions/pom.xml
index e91ca5c..29ee705 100644
--- a/dubbo-filter-extensions/pom.xml
+++ b/dubbo-filter-extensions/pom.xml
@@ -27,5 +27,9 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>dubbo-filter-extensions</artifactId>
+ <packaging>pom</packaging>
<version>${revision}</version>
+ <modules>
+ <module>dubbo-filter-seata</module>
+ </modules>
</project>