This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-spi-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new cf5dfeb  Add seata implementation (#129)
cf5dfeb is described below

commit cf5dfebac1cb09f5f36dd97a8a3d5d5c2e435516
Author: Albumen Kevin <[email protected]>
AuthorDate: Thu Jul 21 17:19:16 2022 +0800

    Add seata implementation (#129)
    
    * Add seata implementation
    
    * exact bom
    
    * fix env
    
    * update seata dependency
---
 dubbo-extensions-dependencies-bom/pom.xml          |   6 +
 .../{ => dubbo-filter-seata}/pom.xml               |  19 ++-
 .../SeataTransactionPropagationConsumerFilter.java |  55 ++++++++
 .../SeataTransactionPropagationProviderFilter.java |  92 +++++++++++++
 .../META-INF/dubbo/org.apache.dubbo.rpc.Filter     |   2 +
 ...taTransactionPropagationConsumerFilterTest.java |  67 ++++++++++
 ...taTransactionPropagationProviderFilterTest.java | 146 +++++++++++++++++++++
 dubbo-filter-extensions/pom.xml                    |   4 +
 8 files changed, 388 insertions(+), 3 deletions(-)

diff --git a/dubbo-extensions-dependencies-bom/pom.xml 
b/dubbo-extensions-dependencies-bom/pom.xml
index f6babb8..41a1230 100644
--- a/dubbo-extensions-dependencies-bom/pom.xml
+++ b/dubbo-extensions-dependencies-bom/pom.xml
@@ -127,6 +127,7 @@
         <consul_version>1.4.2</consul_version>
         <consul_client_version>1.3.7</consul_client_version>
         <test_container_version>1.15.2</test_container_version>
+        <seata_version>1.5.2</seata_version>
 
         <maven_flatten_version>1.2.5</maven_flatten_version>
     </properties>
@@ -421,6 +422,11 @@
                 <artifactId>testcontainers</artifactId>
                 <version>${test_container_version}</version>
             </dependency>
+            <dependency>
+                <groupId>io.seata</groupId>
+                <artifactId>seata-core</artifactId>
+                <version>${seata_version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
diff --git a/dubbo-filter-extensions/pom.xml 
b/dubbo-filter-extensions/dubbo-filter-seata/pom.xml
similarity index 72%
copy from dubbo-filter-extensions/pom.xml
copy to dubbo-filter-extensions/dubbo-filter-seata/pom.xml
index e91ca5c..0079b3d 100644
--- a/dubbo-filter-extensions/pom.xml
+++ b/dubbo-filter-extensions/dubbo-filter-seata/pom.xml
@@ -19,13 +19,26 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
+        <artifactId>dubbo-filter-extensions</artifactId>
         <groupId>org.apache.dubbo.extensions</groupId>
-        <artifactId>extensions-parent</artifactId>
         <version>${revision}</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>dubbo-filter-extensions</artifactId>
-    <version>${revision}</version>
+    <artifactId>dubbo-filter-seata</artifactId>
+    <name>${project.artifactId}</name>
+    <version>1.0.0-SNAPSHOT</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>io.seata</groupId>
+            <artifactId>seata-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo</artifactId>
+        </dependency>
+    </dependencies>
+
 </project>
diff --git 
a/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilter.java
 
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilter.java
new file mode 100644
index 0000000..4af0622
--- /dev/null
+++ 
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+
+import io.seata.core.constants.DubboConstants;
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+
+/**
+ * The type Transaction propagation consumer filter.
+ */
+@Activate(group = DubboConstants.CONSUMER)
+public class SeataTransactionPropagationConsumerFilter implements Filter {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeataTransactionPropagationConsumerFilter.class);
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws 
RpcException {
+        String xid = RootContext.getXID();
+        BranchType branchType = RootContext.getBranchType();
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug(String.format("Client side xid in RootContext[%s]", 
xid));
+        }
+        if (xid != null) {
+            invocation.setAttachment(RootContext.KEY_XID, xid);
+            if (branchType != null) {
+                invocation.setAttachment(RootContext.KEY_BRANCH_TYPE, 
branchType.name());
+            }
+        }
+        return invoker.invoke(invocation);
+    }
+}
diff --git 
a/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilter.java
 
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilter.java
new file mode 100644
index 0000000..e678b11
--- /dev/null
+++ 
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+
+import io.seata.common.util.StringUtils;
+import io.seata.core.constants.DubboConstants;
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+
+/**
+ * The type Transaction propagation provider filter.
+ */
+@Activate(group = DubboConstants.PROVIDER)
+public class SeataTransactionPropagationProviderFilter implements Filter {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SeataTransactionPropagationProviderFilter.class);
+
+    @Override
+    public Result invoke(Invoker<?> invoker, Invocation invocation) throws 
RpcException {
+        String rpcXid = invocation.getAttachment(RootContext.KEY_XID);
+        if (rpcXid == null) {
+            rpcXid = 
invocation.getAttachment(RootContext.KEY_XID.toLowerCase());
+        }
+        String rpcBranchType = 
invocation.getAttachment(RootContext.KEY_BRANCH_TYPE);
+        if (rpcBranchType == null) {
+            rpcBranchType = 
invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase());
+        }
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Server side xid in RpcContext[" + rpcXid + "]");
+        }
+        boolean bind = false;
+
+        if (rpcXid != null) {
+            RootContext.bind(rpcXid);
+            if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) {
+                RootContext.bindBranchType(BranchType.TCC);
+            }
+            bind = true;
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug(String.format("bind xid [%s] branchType [%s] to 
RootContext", rpcXid, rpcBranchType));
+            }
+        }
+        try {
+            return invoker.invoke(invocation);
+        } finally {
+            if (bind) {
+                BranchType previousBranchType = RootContext.getBranchType();
+                String unbindXid = RootContext.unbind();
+                if (BranchType.TCC == previousBranchType) {
+                    RootContext.unbindBranchType();
+                }
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug(String.format("unbind xid [%s] branchType 
[%s] from RootContext", unbindXid, previousBranchType));
+                }
+                if (!rpcXid.equalsIgnoreCase(unbindXid)) {
+                    LOGGER.warn(String.format("xid in change during RPC from 
%s to %s,branchType from %s to %s", rpcXid, unbindXid,
+                        rpcBranchType != null ? rpcBranchType : "AT", 
previousBranchType));
+                    if (unbindXid != null) {
+                        RootContext.bind(unbindXid);
+                        LOGGER.warn(String.format("bind xid [%s] back to 
RootContext", unbindXid));
+                        if (BranchType.TCC == previousBranchType) {
+                            RootContext.bindBranchType(BranchType.TCC);
+                            LOGGER.warn(String.format("bind branchType [%s] 
back to RootContext", previousBranchType));
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git 
a/dubbo-filter-extensions/dubbo-filter-seata/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
 
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
new file mode 100644
index 0000000..3caed90
--- /dev/null
+++ 
b/dubbo-filter-extensions/dubbo-filter-seata/src/main/resources/META-INF/dubbo/org.apache.dubbo.rpc.Filter
@@ -0,0 +1,2 @@
+seata-provider=org.apache.dubbo.seata.SeataTransactionPropagationProviderFilter
+seata-consumer=org.apache.dubbo.seata.SeataTransactionPropagationConsumerFilter
diff --git 
a/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilterTest.java
 
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilterTest.java
new file mode 100644
index 0000000..423e094
--- /dev/null
+++ 
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationConsumerFilterTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class SeataTransactionPropagationConsumerFilterTest {
+    @Test
+    public void test() {
+        ApplicationModel applicationModel = 
FrameworkModel.defaultModel().newApplication();
+        ModuleModel moduleModel = applicationModel.newModule();
+
+        Filter filter = moduleModel.getExtension(Filter.class, 
"seata-consumer");
+        Assertions.assertTrue(filter instanceof 
SeataTransactionPropagationConsumerFilter);
+
+
+        Invoker invoker = Mockito.mock(Invoker.class);
+        Invocation invocation = Mockito.mock(Invocation.class);
+        filter.invoke(invoker, invocation);
+
+        Mockito.verify(invoker, Mockito.times(1)).invoke(invocation);
+        Mockito.verify(invocation, 
Mockito.times(0)).setAttachment(Mockito.any(), Mockito.any());
+
+        RootContext.bind("123456");
+        filter.invoke(invoker, invocation);
+        Mockito.verify(invoker, Mockito.times(2)).invoke(invocation);
+        Mockito.verify(invocation, 
Mockito.times(1)).setAttachment(RootContext.KEY_XID, "123456");
+        Mockito.verify(invocation, 
Mockito.times(2)).setAttachment(Mockito.any(), Mockito.any());
+
+        RootContext.bind("12345678");
+        RootContext.bindBranchType(BranchType.SAGA);
+        filter.invoke(invoker, invocation);
+        Mockito.verify(invoker, Mockito.times(3)).invoke(invocation);
+        Mockito.verify(invocation, 
Mockito.times(1)).setAttachment(RootContext.KEY_XID, "12345678");
+        Mockito.verify(invocation, 
Mockito.times(1)).setAttachment(RootContext.KEY_BRANCH_TYPE, "SAGA");
+        Mockito.verify(invocation, 
Mockito.times(4)).setAttachment(Mockito.any(), Mockito.any());
+
+        RootContext.unbind();
+        RootContext.unbindBranchType();
+        moduleModel.destroy();
+    }
+}
diff --git 
a/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilterTest.java
 
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilterTest.java
new file mode 100644
index 0000000..a5e0f76
--- /dev/null
+++ 
b/dubbo-filter-extensions/dubbo-filter-seata/src/test/java/org/apache/dubbo/seata/SeataTransactionPropagationProviderFilterTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.seata;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.rpc.Filter;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ModuleModel;
+
+import io.seata.core.context.RootContext;
+import io.seata.core.model.BranchType;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+public class SeataTransactionPropagationProviderFilterTest {
+    private final AtomicReference<Function<Invocation, Result>> invokeFunction 
= new AtomicReference<>();
+
+    private Invoker invoker = new Invoker() {
+        @Override
+        public Class getInterface() {
+            return null;
+        }
+
+        @Override
+        public Result invoke(Invocation invocation) throws RpcException {
+            return invokeFunction.get().apply(invocation);
+        }
+
+        @Override
+        public URL getUrl() {
+            return null;
+        }
+
+        @Override
+        public boolean isAvailable() {
+            return false;
+        }
+
+        @Override
+        public void destroy() {
+
+        }
+    };
+
+
+    @Test
+    public void test() {
+        ApplicationModel applicationModel = 
FrameworkModel.defaultModel().newApplication();
+        ModuleModel moduleModel = applicationModel.newModule();
+
+        Filter filter = moduleModel.getExtension(Filter.class, 
"seata-provider");
+        Assertions.assertTrue(filter instanceof 
SeataTransactionPropagationProviderFilter);
+
+        Invocation invocation = Mockito.mock(Invocation.class);
+        invokeFunction.set((inv) -> {
+            Assertions.assertNull(RootContext.getXID());
+            Assertions.assertNull(RootContext.getBranchType());
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_XID)).thenReturn("1234");
+        invokeFunction.set((inv) -> {
+            Assertions.assertEquals("1234", RootContext.getXID());
+            Assertions.assertEquals(BranchType.AT, 
RootContext.getBranchType());
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_XID)).thenReturn(null);
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_XID.toLowerCase())).thenReturn("123456");
+        invokeFunction.set((inv) -> {
+            Assertions.assertEquals("123456", RootContext.getXID());
+            Assertions.assertEquals(BranchType.AT, 
RootContext.getBranchType());
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE)).thenReturn("TCC");
+        invokeFunction.set((inv) -> {
+            Assertions.assertEquals("123456", RootContext.getXID());
+            Assertions.assertEquals(BranchType.TCC, 
RootContext.getBranchType());
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE)).thenReturn(null);
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase())).thenReturn("TCC");
+        invokeFunction.set((inv) -> {
+            Assertions.assertEquals("123456", RootContext.getXID());
+            Assertions.assertEquals(BranchType.TCC, 
RootContext.getBranchType());
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+        Assertions.assertNull(RootContext.getXID());
+        Assertions.assertNull(RootContext.getBranchType());
+
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase())).thenReturn("AT");
+        invokeFunction.set((inv) -> {
+            RootContext.bind("12345678");
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+        Assertions.assertEquals("12345678", RootContext.getXID());
+        Assertions.assertEquals(BranchType.AT, RootContext.getBranchType());
+        RootContext.unbind();
+        RootContext.unbindBranchType();
+
+        
Mockito.when(invocation.getAttachment(RootContext.KEY_BRANCH_TYPE.toLowerCase())).thenReturn("AT");
+        invokeFunction.set((inv) -> {
+            RootContext.bind("12345678");
+            RootContext.bindBranchType(BranchType.TCC);
+            return null;
+        });
+        filter.invoke(invoker, invocation);
+        Assertions.assertEquals("12345678", RootContext.getXID());
+        Assertions.assertEquals(BranchType.TCC, RootContext.getBranchType());
+        RootContext.unbind();
+        RootContext.unbindBranchType();
+
+        moduleModel.destroy();
+    }
+}
diff --git a/dubbo-filter-extensions/pom.xml b/dubbo-filter-extensions/pom.xml
index e91ca5c..29ee705 100644
--- a/dubbo-filter-extensions/pom.xml
+++ b/dubbo-filter-extensions/pom.xml
@@ -27,5 +27,9 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>dubbo-filter-extensions</artifactId>
+    <packaging>pom</packaging>
     <version>${revision}</version>
+    <modules>
+        <module>dubbo-filter-seata</module>
+    </modules>
 </project>

Reply via email to