This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git

commit 314a271dbfb7e5f5f8bf4ca1e15278da97fa7bb8
Author: seanyinx <sean....@huawei.com>
AuthorDate: Tue Jan 2 10:49:49 2018 +0800

    SCB-149 distinguished omega callbacks by service name and instance id
    
    Signed-off-by: seanyinx <sean....@huawei.com>
---
 alpha/alpha-core/pom.xml                           |   9 ++
 .../saga/alpha/core/AlphaException.java            |  24 ++++
 .../saga/alpha/core/CompositeOmegaCallback.java    |  44 ++++++++
 .../saga/alpha/core/RetryOmegaCallback.java        |  71 ++++++++++++
 .../alpha/core/CompositeOmegaCallbackTest.java     | 122 +++++++++++++++++++++
 .../saga/alpha/core/RetryOmegaCallbackTest.java    |  79 +++++++++++++
 .../alpha-core/src/test/resources/log4j2-test.xml  |  30 +++++
 .../servicecomb/saga/alpha/server/AlphaConfig.java |  21 +++-
 8 files changed, 395 insertions(+), 5 deletions(-)

diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml
index 92f22f6..93c810f 100644
--- a/alpha/alpha-core/pom.xml
+++ b/alpha/alpha-core/pom.xml
@@ -29,6 +29,11 @@
   <artifactId>alpha-core</artifactId>
   <dependencies>
     <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
     </dependency>
@@ -44,6 +49,10 @@
       <groupId>org.awaitility</groupId>
       <artifactId>awaitility</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+    </dependency>
   </dependencies>
 
 
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java
new file mode 100644
index 0000000..a5eb3c4
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+public class AlphaException extends RuntimeException {
+  public AlphaException(String cause) {
+    super(cause);
+  }
+}
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
new file mode 100644
index 0000000..e5c4b12
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import java.util.Map;
+
+public class CompositeOmegaCallback implements OmegaCallback {
+  private final Map<String, Map<String, OmegaCallback>> callbacks;
+
+  public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> 
callbacks) {
+    this.callbacks = callbacks;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    Map<String, OmegaCallback> serviceCallbacks = 
callbacks.get(event.serviceName());
+
+    if (serviceCallbacks.isEmpty()) {
+      throw new AlphaException("No such omega callback found for service " + 
event.serviceName());
+    }
+
+    OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId());
+    if (omegaCallback == null) {
+      serviceCallbacks.values().iterator().next().compensate(event);
+    } else {
+      omegaCallback.compensate(event);
+    }
+  }
+}
diff --git 
a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
new file mode 100644
index 0000000..6f1a7dd
--- /dev/null
+++ 
b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RetryOmegaCallback implements OmegaCallback {
+  private static final Logger log = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String ERROR_MESSAGE = "Failed to compensate service 
[{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]";
+
+  private final OmegaCallback underlying;
+  private final int delay;
+
+  public RetryOmegaCallback(OmegaCallback underlying, int delay) {
+    this.underlying = underlying;
+    this.delay = delay;
+  }
+
+  @Override
+  public void compensate(TxEvent event) {
+    boolean success = false;
+    do {
+      try {
+        underlying.compensate(event);
+        success = true;
+      } catch (Exception e) {
+        logError(ERROR_MESSAGE, event, e);
+        sleep(event);
+      }
+    } while (!success && !Thread.currentThread().isInterrupted());
+  }
+
+  private void sleep(TxEvent event) {
+    try {
+      TimeUnit.MILLISECONDS.sleep(delay);
+    } catch (InterruptedException e) {
+      logError(ERROR_MESSAGE + " due to interruption", event, e);
+
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void logError(String message, TxEvent event, Exception e) {
+    log.error(message,
+        event.serviceName(),
+        event.instanceId(),
+        event.compensationMethod(),
+        event.globalTxId(),
+        event.localTxId(),
+        e);
+  }
+}
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
new file mode 100644
index 0000000..38e04a3
--- /dev/null
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing;
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import java.util.Date;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class CompositeOmegaCallbackTest {
+
+  private final OmegaCallback callback1One = Mockito.mock(OmegaCallback.class);
+  private final OmegaCallback callback1Two = Mockito.mock(OmegaCallback.class);
+
+  private final OmegaCallback callback2One = Mockito.mock(OmegaCallback.class);
+  private final OmegaCallback callback2Two = Mockito.mock(OmegaCallback.class);
+
+  private final String serviceName1 = uniquify("serviceName1");
+  private final String instanceId1One = uniquify("instanceId1One");
+  private final String instanceId1Two = uniquify("instanceId1Two");
+
+  private final String serviceName2 = uniquify("serviceName2");
+  private final String instanceId2One = uniquify("instanceId2One");
+  private final String instanceId2Two = uniquify("instanceId2Two");
+
+  private final Map<String, Map<String, OmegaCallback>> callbacks = new 
ConcurrentHashMap<>();
+  private final CompositeOmegaCallback compositeOmegaCallback = new 
CompositeOmegaCallback(callbacks);
+
+  @Before
+  public void setUp() throws Exception {
+    callbacks.put(serviceName1, new ConcurrentHashMap<>());
+    callbacks.get(serviceName1).put(instanceId1One, callback1One);
+    callbacks.get(serviceName1).put(instanceId1Two, callback1Two);
+
+    callbacks.put(serviceName2, new ConcurrentHashMap<>());
+    callbacks.get(serviceName2).put(instanceId2One, callback2One);
+    callbacks.get(serviceName2).put(instanceId2Two, callback2Two);
+  }
+
+  @Test
+  public void compensateCorrespondingOmegaInstanceOnly() throws Exception {
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    compositeOmegaCallback.compensate(event);
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One).compensate(event);
+    verify(callback2Two, never()).compensate(event);
+  }
+
+  @Test
+  public void compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() 
throws Exception {
+    callbacks.get(serviceName2).remove(instanceId2One);
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    compositeOmegaCallback.compensate(event);
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One, never()).compensate(event);
+    verify(callback2Two).compensate(event);
+  }
+
+  @Test
+  public void blowsUpIfNoSuchServiceIsReachable() throws Exception {
+    callbacks.get(serviceName2).clear();
+    TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent);
+
+    try {
+      compositeOmegaCallback.compensate(event);
+      expectFailing(AlphaException.class);
+    } catch (AlphaException e) {
+      assertThat(e.getMessage(), is("No such omega callback found for service 
" + serviceName2));
+    }
+
+    verify(callback1One, never()).compensate(event);
+    verify(callback1Two, never()).compensate(event);
+    verify(callback2One, never()).compensate(event);
+    verify(callback2Two, never()).compensate(event);
+  }
+
+  private TxEvent eventOf(String serviceName, String instanceId, EventType 
eventType) {
+    return new TxEvent(
+        serviceName,
+        instanceId,
+        new Date(),
+        uniquify("globalTxId"),
+        uniquify("localTxId"),
+        UUID.randomUUID().toString(),
+        eventType.name(),
+        getClass().getCanonicalName(),
+        uniquify("blah").getBytes());
+  }
+}
diff --git 
a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
new file mode 100644
index 0000000..27cc16f
--- /dev/null
+++ 
b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.servicecomb.saga.alpha.core;
+
+import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Date;
+import java.util.UUID;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class RetryOmegaCallbackTest {
+  private final int delay = 100;
+  private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class);
+  private final RetryOmegaCallback callback = new 
RetryOmegaCallback(underlying, delay);
+
+  @Test
+  public void retryOnFailure() throws Exception {
+    TxEvent event = someEvent();
+
+    doThrow(AlphaException.class)
+        .doThrow(AlphaException.class)
+        .doNothing()
+        .when(underlying)
+        .compensate(event);
+
+    callback.compensate(event);
+
+    verify(underlying, times(3)).compensate(event);
+  }
+
+  @Test
+  public void exitOnInterruption() throws Exception {
+    TxEvent event = someEvent();
+
+    doThrow(AlphaException.class).when(underlying).compensate(event);
+
+    Thread thread = new Thread(() -> callback.compensate(event));
+    thread.start();
+
+    Thread.sleep(300);
+    thread.interrupt();
+
+    verify(underlying, atMost(4)).compensate(event);
+  }
+
+  private TxEvent someEvent() {
+    return new TxEvent(
+        uniquify("serviceName"),
+        uniquify("instanceId"),
+        new Date(),
+        uniquify("globalTxId"),
+        uniquify("localTxId"),
+        UUID.randomUUID().toString(),
+        EventType.TxStartedEvent.name(),
+        getClass().getCanonicalName(),
+        uniquify("blah").getBytes());
+  }
+}
diff --git a/alpha/alpha-core/src/test/resources/log4j2-test.xml 
b/alpha/alpha-core/src/test/resources/log4j2-test.xml
new file mode 100644
index 0000000..58924c6
--- /dev/null
+++ b/alpha/alpha-core/src/test/resources/log4j2-test.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<Configuration status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
index 55105d4..f44c951 100644
--- 
a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
+++ 
b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java
@@ -17,22 +17,34 @@
 
 package io.servicecomb.saga.alpha.server;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import io.servicecomb.saga.alpha.core.CompositeOmegaCallback;
 import io.servicecomb.saga.alpha.core.OmegaCallback;
+import io.servicecomb.saga.alpha.core.RetryOmegaCallback;
 import io.servicecomb.saga.alpha.core.TxConsistentService;
-import io.servicecomb.saga.alpha.core.TxEvent;
 import io.servicecomb.saga.alpha.core.TxEventRepository;
 
 @Configuration
 class AlphaConfig {
 
+  // TODO: 2017/12/27 to be filled with actual callbacks on completion of 
SCB-138
   @Bean
-  OmegaCallback omegaCallback() {
-    // TODO: 2017/12/27 to be replaced with actual callback on completion of 
SCB-138
-    return event -> {};
+  Map<String, Map<String, OmegaCallback>> omegaCallbacks() {
+    return new ConcurrentHashMap<>();
+  }
+
+  @Bean
+  OmegaCallback omegaCallback(
+      Map<String, Map<String, OmegaCallback>> callbacks,
+      @Value("${alpha.compensation.retry.delay:3000}") int delay) {
+
+    return new RetryOmegaCallback(new CompositeOmegaCallback(callbacks), 
delay);
   }
   
   @Bean
@@ -48,7 +60,6 @@ class AlphaConfig {
     return eventRepository;
   }
 
-  // TODO: 2017/12/29 how to match callback with service instance? send some 
msg on startup?
   private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, 
TxEventRepository eventRepository) {
     return new GrpcStartable(
         port,

-- 
To stop receiving notification emails like this one, please contact
"commits@servicecomb.apache.org" <commits@servicecomb.apache.org>.

Reply via email to