This is an automated email from the ASF dual-hosted git repository. ningjiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-servicecomb-saga.git
commit 49235b50798956ffdd1a65feb503cf9c1bfb7baa Author: seanyinx <[email protected]> AuthorDate: Tue Jan 2 16:29:43 2018 +0800 SCB-149 pushed failed compensations to a scheduled task queue Signed-off-by: seanyinx <[email protected]> --- .../saga/alpha/core/PendingTaskRunner.java | 45 ++++++++++++ ...egaCallback.java => PushBackOmegaCallback.java} | 38 ++++------- .../saga/alpha/core/TxConsistentService.java | 1 - .../saga/alpha/core/PendingTaskRunnerTest.java | 60 ++++++++++++++++ .../saga/alpha/core/PushBackOmegaCallbackTest.java | 65 ++++++++++++++++++ .../saga/alpha/core/RetryOmegaCallbackTest.java | 79 ---------------------- .../servicecomb/saga/alpha/core/TxEventMaker.java | 36 ++++++++++ .../servicecomb/saga/alpha/server/AlphaConfig.java | 23 +++++-- 8 files changed, 235 insertions(+), 112 deletions(-) diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java new file mode 100644 index 0000000..c9a06fa --- /dev/null +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PendingTaskRunner.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class PendingTaskRunner { + private final BlockingQueue<Runnable> pendingTasks; + private final int delay; + + public PendingTaskRunner(BlockingQueue<Runnable> pendingTasks, int delay) { + this.pendingTasks = pendingTasks; + this.delay = delay; + } + + public Future<?> run() { + return Executors.newSingleThreadScheduledExecutor() + .scheduleWithFixedDelay(() -> { + try { + pendingTasks.take().run(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }, 0, delay, MILLISECONDS); + } +} diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java similarity index 58% rename from alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java rename to alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java index 6f1a7dd..8403af0 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/RetryOmegaCallback.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallback.java @@ -18,49 +18,35 @@ package io.servicecomb.saga.alpha.core; import java.lang.invoke.MethodHandles; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.BlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RetryOmegaCallback implements OmegaCallback { +public class PushBackOmegaCallback implements OmegaCallback { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static final String ERROR_MESSAGE = "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]"; + private final BlockingQueue<Runnable> pendingCompensations; private final OmegaCallback underlying; - private final int delay; - public RetryOmegaCallback(OmegaCallback underlying, int delay) { + public PushBackOmegaCallback(BlockingQueue<Runnable> pendingCompensations, OmegaCallback underlying) { + this.pendingCompensations = pendingCompensations; this.underlying = underlying; - this.delay = delay; } @Override public void compensate(TxEvent event) { - boolean success = false; - do { - try { - underlying.compensate(event); - success = true; - } catch (Exception e) { - logError(ERROR_MESSAGE, event, e); - sleep(event); - } - } while (!success && !Thread.currentThread().isInterrupted()); - } - - private void sleep(TxEvent event) { try { - TimeUnit.MILLISECONDS.sleep(delay); - } catch (InterruptedException e) { - logError(ERROR_MESSAGE + " due to interruption", event, e); - - Thread.currentThread().interrupt(); + underlying.compensate(event); + } catch (Exception e) { + logError(event, e); + pendingCompensations.offer(() -> compensate(event)); } } - private void logError(String message, TxEvent event, Exception e) { - log.error(message, + private void logError(TxEvent event, Exception e) { + log.error( + "Failed to compensate service [{}] instance [{}] with method [{}], global tx id [{}] and local tx id [{}]", event.serviceName(), event.instanceId(), event.compensationMethod(), diff --git a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java index 5dcb7bc..6fd9193 100644 --- a/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java +++ b/alpha/alpha-core/src/main/java/io/servicecomb/saga/alpha/core/TxConsistentService.java @@ -46,7 +46,6 @@ public class TxConsistentService { CompletableFuture.runAsync(() -> eventCallbacks.getOrDefault(event.type(), DO_NOTHING_CONSUMER).accept(event)); } - // TODO: 2017/12/27 we must define a way to find which service to compensate, to avoid sending to all private void compensate(TxEvent event) { List<TxEvent> events = eventRepository.findStartedTransactions(event.globalTxId(), TxStartedEvent.name()); events.forEach(omegaCallback::compensate); diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java new file mode 100644 index 0000000..d806eec --- /dev/null +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PendingTaskRunnerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.junit.Test; + +public class PendingTaskRunnerTest { + private final List<String> messages = new ArrayList<>(); + private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>(); + private final PendingTaskRunner taskRunner = new PendingTaskRunner(runnables, 10); + + @Test + public void burnsAllTasksInQueue() throws Exception { + runnables.offer(() -> messages.add("hello")); + runnables.offer(() -> messages.add("world")); + + taskRunner.run(); + + await().atMost(500, MILLISECONDS).until(runnables::isEmpty); + + assertThat(messages, contains("hello", "world")); + } + + @Test + public void exitOnInterruption() throws Exception { + taskRunner.run().cancel(true); + + runnables.offer(() -> messages.add("hello")); + Thread.sleep(300); + + assertThat(runnables.isEmpty(), is(false)); + assertThat(messages.isEmpty(), is(true)); + } +} diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java new file mode 100644 index 0000000..f53624c --- /dev/null +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/PushBackOmegaCallbackTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core; + +import static io.servicecomb.saga.alpha.core.TxEventMaker.someEvent; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class PushBackOmegaCallbackTest { + private static final Runnable NO_OP_RUNNABLE = () -> { + }; + + private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class); + private final BlockingQueue<Runnable> runnables = new LinkedBlockingQueue<>(); + private final PushBackOmegaCallback pushBack = new PushBackOmegaCallback(runnables, underlying); + + @Before + public void setUp() throws Exception { + runnables.offer(NO_OP_RUNNABLE); + } + + @Test + public void pushFailedCallbackToEndOfQueue() throws Exception { + TxEvent event = someEvent(); + doThrow(AlphaException.class).doThrow(AlphaException.class).doNothing().when(underlying).compensate(event); + + pushBack.compensate(event); + + assertThat(runnables.size(), is(2)); + assertThat(runnables.poll(), is(NO_OP_RUNNABLE)); + + // failed again and pushed back itself to queue + runnables.poll().run(); + assertThat(runnables.size(), is(1)); + + runnables.poll().run(); + + verify(underlying, times(3)).compensate(event); + } +} diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java deleted file mode 100644 index 27cc16f..0000000 --- a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/RetryOmegaCallbackTest.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.servicecomb.saga.alpha.core; - -import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; -import static org.mockito.Mockito.atMost; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.Date; -import java.util.UUID; - -import org.junit.Test; -import org.mockito.Mockito; - -public class RetryOmegaCallbackTest { - private final int delay = 100; - private final OmegaCallback underlying = Mockito.mock(OmegaCallback.class); - private final RetryOmegaCallback callback = new RetryOmegaCallback(underlying, delay); - - @Test - public void retryOnFailure() throws Exception { - TxEvent event = someEvent(); - - doThrow(AlphaException.class) - .doThrow(AlphaException.class) - .doNothing() - .when(underlying) - .compensate(event); - - callback.compensate(event); - - verify(underlying, times(3)).compensate(event); - } - - @Test - public void exitOnInterruption() throws Exception { - TxEvent event = someEvent(); - - doThrow(AlphaException.class).when(underlying).compensate(event); - - Thread thread = new Thread(() -> callback.compensate(event)); - thread.start(); - - Thread.sleep(300); - thread.interrupt(); - - verify(underlying, atMost(4)).compensate(event); - } - - private TxEvent someEvent() { - return new TxEvent( - uniquify("serviceName"), - uniquify("instanceId"), - new Date(), - uniquify("globalTxId"), - uniquify("localTxId"), - UUID.randomUUID().toString(), - EventType.TxStartedEvent.name(), - getClass().getCanonicalName(), - uniquify("blah").getBytes()); - } -} diff --git a/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java new file mode 100644 index 0000000..77ef920 --- /dev/null +++ b/alpha/alpha-core/src/test/java/io/servicecomb/saga/alpha/core/TxEventMaker.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.servicecomb.saga.alpha.core;import static com.seanyinx.github.unit.scaffolding.Randomness.uniquify; + +import java.util.Date; +import java.util.UUID; + +class TxEventMaker { + static TxEvent someEvent() { + return new TxEvent( + uniquify("serviceName"), + uniquify("instanceId"), + new Date(), + uniquify("globalTxId"), + uniquify("localTxId"), + UUID.randomUUID().toString(), + EventType.TxStartedEvent.name(), + TxEventMaker.class.getCanonicalName(), + uniquify("blah").getBytes()); + } +} diff --git a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java index f44c951..3685894 100644 --- a/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java +++ b/alpha/alpha-server/src/main/java/io/servicecomb/saga/alpha/server/AlphaConfig.java @@ -18,7 +18,11 @@ package io.servicecomb.saga.alpha.server; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; @@ -26,12 +30,17 @@ import org.springframework.context.annotation.Configuration; import io.servicecomb.saga.alpha.core.CompositeOmegaCallback; import io.servicecomb.saga.alpha.core.OmegaCallback; -import io.servicecomb.saga.alpha.core.RetryOmegaCallback; +import io.servicecomb.saga.alpha.core.PendingTaskRunner; +import io.servicecomb.saga.alpha.core.PushBackOmegaCallback; import io.servicecomb.saga.alpha.core.TxConsistentService; import io.servicecomb.saga.alpha.core.TxEventRepository; @Configuration class AlphaConfig { + private final BlockingQueue<Runnable> pendingCompensations = new LinkedBlockingQueue<>(); + + @Value("${alpha.compensation.retry.delay:3000}") + private int delay; // TODO: 2017/12/27 to be filled with actual callbacks on completion of SCB-138 @Bean @@ -40,11 +49,8 @@ class AlphaConfig { } @Bean - OmegaCallback omegaCallback( - Map<String, Map<String, OmegaCallback>> callbacks, - @Value("${alpha.compensation.retry.delay:3000}") int delay) { - - return new RetryOmegaCallback(new CompositeOmegaCallback(callbacks), delay); + OmegaCallback omegaCallback(Map<String, Map<String, OmegaCallback>> callbacks) { + return new PushBackOmegaCallback(pendingCompensations, new CompositeOmegaCallback(callbacks)); } @Bean @@ -68,4 +74,9 @@ class AlphaConfig { eventRepository, omegaCallback))); } + + @PostConstruct + void init() { + new PendingTaskRunner(pendingCompensations, delay).run(); + } } -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
