This is an automated email from the ASF dual-hosted git repository. seanyinx pushed a commit to branch SCB-149_service_aware_callback in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 8d459e2691a41ba6f035efa3baaf71f6a8979bda Author: seanyinx <[email protected]> AuthorDate: Tue Jan 2 10:49:49 2018 +0800 SCB-149 distinguished omega callbacks by service name and instance id Signed-off-by: seanyinx <[email protected]> --- alpha/alpha-core/pom.xml | 9 ++ .../saga/alpha/core/AlphaException.java | 24 ++++ .../saga/alpha/core/CompositeOmegaCallback.java | 44 ++++++++ .../saga/alpha/core/RetryOmegaCallback.java | 71 ++++++++++++ .../alpha/core/CompositeOmegaCallbackTest.java | 122 +++++++++++++++++++++ .../saga/alpha/core/RetryOmegaCallbackTest.java | 79 +++++++++++++ .../alpha-core/src/test/resources/log4j2-test.xml | 30 +++++ .../servicecomb/saga/alpha/server/AlphaConfig.java | 21 +++- 8 files changed, 395 insertions(+), 5 deletions(-) diff --git a/alpha/alpha-core/pom.xml b/alpha/alpha-core/pom.xml index 92f22f6..93c810f 100644 --- a/alpha/alpha-core/pom.xml +++ b/alpha/alpha-core/pom.xml @@ -29,6 +29,11 @@ <artifactId>alpha-core</artifactId> <dependencies> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> </dependency> @@ -44,6 +49,10 @@ <groupId>org.awaitility</groupId> <artifactId>awaitility</artifactId> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> </dependencies> diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java new file mode 100644 index 0000000..a5eb3c4 --- /dev/null +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/AlphaException.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +public class AlphaException extends RuntimeException { + public AlphaException(String cause) { + super(cause); + } +} diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java new file mode 100644 index 0000000..e5c4b12 --- /dev/null +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallback.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import java.util.Map; + +public class CompositeOmegaCallback implements OmegaCallback { + private final Map<String, Map<String, OmegaCallback>> callbacks; + + public CompositeOmegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) { + this.callbacks = callbacks; + } + + @Override + public void compensate(TxEvent event) { + Map<String, OmegaCallback> serviceCallbacks = callbacks.get(event.serviceName()); + + if (serviceCallbacks.isEmpty()) { + throw new AlphaException("No such omega callback found for service " + event.serviceName()); + } + + OmegaCallback omegaCallback = serviceCallbacks.get(event.instanceId()); + if (omegaCallback == null) { + serviceCallbacks.values().iterator().next().compensate(event); + } else { + omegaCallback.compensate(event); + } + } +} diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java new file mode 100644 index 0000000..6f1a7dd --- /dev/null +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RetryOmegaCallback implements OmegaCallback { + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String ERROR_MESSAGE = "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]"; + + private final OmegaCallback underlying; + private final int delay; + + public RetryOmegaCallback(OmegaCallback underlying, int delay) { + this.underlying = underlying; + this.delay = delay; + } + + @Override + public void compensate(TxEvent event) { + boolean success = false; + do { + try { + underlying.compensate(event); + success = true; + } catch (Exception e) { + logError(ERROR_MESSAGE, event, e); + sleep(event); + } + } while (!success && !Thread.currentThread().isInterrupted()); + } + + private void sleep(TxEvent event) { + try { + TimeUnit.MILLISECONDS.sleep(delay); + } catch (InterruptedException e) { + logError(ERROR_MESSAGE + " due to interruption", event, e); + + Thread.currentThread().interrupt(); + } + } + + private void logError(String message, TxEvent event, Exception e) { + log.error(message, + event.serviceName(), + event.instanceId(), + event.compensationMethod(), + event.globalTxId(), + event.localTxId(), + e); + } +} diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java new file mode 100644 index 0000000..38e04a3 --- /dev/null +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/CompositeOmegaCallbackTest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import static com.seanyinx.github.unit.scaffolding.AssertUtils.expectFailing; +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static io.servicecomb.saga.alpha.core.EventType.TxStartedEvent; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.util.Date; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class CompositeOmegaCallbackTest { + + private final OmegaCallback callback1One = Mockito.mock(OmegaCallback.class); + private final OmegaCallback callback1Two = Mockito.mock(OmegaCallback.class); + + private final OmegaCallback callback2One = Mockito.mock(OmegaCallback.class); + private final OmegaCallback callback2Two = Mockito.mock(OmegaCallback.class); + + private final String serviceName1 = uniquify("serviceName1"); + private final String instanceId1One = uniquify("instanceId1One"); + private final String instanceId1Two = uniquify("instanceId1Two"); + + private final String serviceName2 = uniquify("serviceName2"); + private final String instanceId2One = uniquify("instanceId2One"); + private final String instanceId2Two = uniquify("instanceId2Two"); + + private final Map<String, Map<String, OmegaCallback>> callbacks = new ConcurrentHashMap<>(); + private final CompositeOmegaCallback compositeOmegaCallback = new CompositeOmegaCallback(callbacks); + + @Before + public void setUp() throws Exception { + callbacks.put(serviceName1, new ConcurrentHashMap<>()); + callbacks.get(serviceName1).put(instanceId1One, callback1One); + callbacks.get(serviceName1).put(instanceId1Two, callback1Two); + + callbacks.put(serviceName2, new ConcurrentHashMap<>()); + callbacks.get(serviceName2).put(instanceId2One, callback2One); + callbacks.get(serviceName2).put(instanceId2Two, callback2Two); + } + + @Test + public void compensateCorrespondingOmegaInstanceOnly() throws Exception { + TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); + + compositeOmegaCallback.compensate(event); + + verify(callback1One, never()).compensate(event); + verify(callback1Two, never()).compensate(event); + verify(callback2One).compensate(event); + verify(callback2Two, never()).compensate(event); + } + + @Test + public void compensateOtherOmegaInstance_IfTheRequestedIsUnreachable() throws Exception { + callbacks.get(serviceName2).remove(instanceId2One); + TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); + + compositeOmegaCallback.compensate(event); + + verify(callback1One, never()).compensate(event); + verify(callback1Two, never()).compensate(event); + verify(callback2One, never()).compensate(event); + verify(callback2Two).compensate(event); + } + + @Test + public void blowsUpIfNoSuchServiceIsReachable() throws Exception { + callbacks.get(serviceName2).clear(); + TxEvent event = eventOf(serviceName2, instanceId2One, TxStartedEvent); + + try { + compositeOmegaCallback.compensate(event); + expectFailing(AlphaException.class); + } catch (AlphaException e) { + assertThat(e.getMessage(), is("No such omega callback found for service " + serviceName2)); + } + + verify(callback1One, never()).compensate(event); + verify(callback1Two, never()).compensate(event); + verify(callback2One, never()).compensate(event); + verify(callback2Two, never()).compensate(event); + } + + private TxEvent eventOf(String serviceName, String instanceId, EventType eventType) { + return new TxEvent( + serviceName, + instanceId, + new Date(), + uniquify("globalTxId"), + uniquify("localTxId"), + UUID.randomUUID().toString(), + eventType.name(), + getClass().getCanonicalName(), + uniquify("blah").getBytes()); + } +} diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java new file mode 100644 index 0000000..27cc16f --- /dev/null +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Date; +import java.util.UUID; + +import org.junit.Test; +import org.mockito.Mockito; + +public class RetryOmegaCallbackTest { + private final int delay = 100; + private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class); + private final RetryOmegaCallback callback = new RetryOmegaCallback(underlying, delay); + + @Test + public void retryOnFailure() throws Exception { + TxEvent event = someEvent(); + + doThrow(AlphaException.class) + .doThrow(AlphaException.class) + .doNothing() + .when(underlying) + .compensate(event); + + callback.compensate(event); + + verify(underlying, times(3)).compensate(event); + } + + @Test + public void exitOnInterruption() throws Exception { + TxEvent event = someEvent(); + + doThrow(AlphaException.class).when(underlying).compensate(event); + + Thread thread = new Thread(() -> callback.compensate(event)); + thread.start(); + + Thread.sleep(300); + thread.interrupt(); + + verify(underlying, atMost(4)).compensate(event); + } + + private TxEvent someEvent() { + return new TxEvent( + uniquify("serviceName"), + uniquify("instanceId"), + new Date(), + uniquify("globalTxId"), + uniquify("localTxId"), + UUID.randomUUID().toString(), + EventType.TxStartedEvent.name(), + getClass().getCanonicalName(), + uniquify("blah").getBytes()); + } +} diff --git a/alpha/alpha-core/src/test/resources/log4j2-test.xml b/alpha/alpha-core/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..58924c6 --- /dev/null +++ b/alpha/alpha-core/src/test/resources/log4j2-test.xml @@ -0,0 +1,30 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<Configuration status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java index 55105d4..f44c951 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java @@ -17,22 +17,34 @@ package io.servicecomb.saga.alpha.server; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import io.servicecomb.saga.alpha.core.CompositeOmegaCallback; import io.servicecomb.saga.alpha.core.OmegaCallback; +import io.servicecomb.saga.alpha.core.RetryOmegaCallback; import io.servicecomb.saga.alpha.core.TxConsistentService; -import io.servicecomb.saga.alpha.core.TxEvent; import io.servicecomb.saga.alpha.core.TxEventRepository; @Configuration class AlphaConfig { + // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138 @Bean - OmegaCallback omegaCallback() { - // TODO: 2017/12/27 to be replaced with actual callback on completion of SCB-138 - return event -> {}; + Map<String, Map<String, OmegaCallback>> omegaCallbacks() { + return new ConcurrentHashMap<>(); + } + + @Bean + OmegaCallback omegaCallback( + Map<String, Map<String, OmegaCallback>> callbacks, + @Value("${alpha.compensation.retry.delay:3000}") int delay) { + + return new RetryOmegaCallback(new CompositeOmegaCallback(callbacks), delay); } @Bean @@ -48,7 +60,6 @@ class AlphaConfig { return eventRepository; } - // TODO: 2017/12/29 how to match callback with service instance? send some msg on startup? private ServerStartable buildGrpc(int port, OmegaCallback omegaCallback, TxEventRepository eventRepository) { return new GrpcStartable( port, -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
