http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java b/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java new file mode 100644 index 0000000..6169744 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/query/events/StatusChange.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.query.events; + +import org.apache.lens.api.query.QueryHandle; +import org.apache.lens.api.query.QueryStatus; + +/** + * The Class StatusChange. + */ +public abstract class StatusChange extends QueryEvent<QueryStatus.Status> { + + /** + * Instantiates a new status change. + * + * @param eventTime the event time + * @param prev the prev + * @param current the current + * @param handle the handle + */ + public StatusChange(long eventTime, QueryStatus.Status prev, QueryStatus.Status current, QueryHandle handle) { + super(eventTime, prev, current, handle); + } + + /** + * Check current state. + * + * @param status the status + */ + protected void checkCurrentState(QueryStatus.Status status) { + if (currentValue != status) { + throw new IllegalStateException("Invalid query state: " + currentValue + " query:" + queryHandle.toString()); + } + } + +}
http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java new file mode 100644 index 0000000..5ea5710 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/BackOffRetryHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lens.server.api.retry; + +import java.io.Serializable; + +/** + * A backoff retry handler. + * + * This allows a backoff on any call, so provides methods whether we can try the operation now, + * whats next time when operation can be performed and whether operation has exhausted all retries. + * Callers of this can do the following : + * + * if (handler.canTryOpNow(FailureContext)) { + * try { + * tryCallerOperation(); + * FailureContext.clear(); + * } catch (any Transient Exception) { + * FailureContext.updateFailure(); + * if (!handler.hasExhaustedRetries(FailureContext)) { + * // will be tried later again + * } + * throw exception; + * } + * } + * + * Note that this is only one of the possible use cases, other complex use cases are in retry framework. + */ +public interface BackOffRetryHandler<FC extends FailureContext> extends Serializable { + + /** + * To know whether operation can be done now. + * + * @param failContext FailureContext holding failures till now. + * + * @return true if operation can be done now, false otherwise. + */ + boolean canTryOpNow(FC failContext); + + /** + * Get the time when the operation can be done next. + * + * @param failContext FC holding failures till now. + * + * @return Next operation time in millis since epoch + */ + long getOperationNextTime(FC failContext); + + /** + * Has the operation exhausted all its retries + * + * @param failContext FC holding failures till now. + * + * @return true if all retries have exhausted, false otherwise. + */ + boolean hasExhaustedRetries(FC failContext); +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java new file mode 100644 index 0000000..46526ad --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ChainedRetryPolicyDecider.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + +import java.util.List; + +import org.apache.lens.server.api.error.LensException; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; +import lombok.Data; + +@Data +public class ChainedRetryPolicyDecider<FC extends FailureContext> implements RetryPolicyDecider<FC> { + private final Iterable<RetryPolicyDecider<FC>> policyDeciders; + + @Override + public BackOffRetryHandler<FC> decidePolicy(String errorMessage) { + for (RetryPolicyDecider<FC> policyDecider : policyDeciders) { + BackOffRetryHandler<FC> policy = policyDecider.decidePolicy(errorMessage); + if (policy != null) { + return policy; + } + } + return new NoRetryHandler<>(); + } + public static <FC extends FailureContext> ChainedRetryPolicyDecider<FC> from(Configuration conf, String key) + throws LensException { + String[] classNames = conf.getStrings(key); + List<RetryPolicyDecider<FC>> retryPolicyDeciders = Lists.newArrayList(); + if (classNames != null) { + for (String className: classNames) { + Class<? extends RetryPolicyDecider<FC>> clazz; + try { + clazz = (Class<? extends RetryPolicyDecider<FC>>) conf.getClassByName(className) + .asSubclass(RetryPolicyDecider.class); + } catch (ClassNotFoundException e) { + throw new LensException("Couldn't load class " + className, e); + } + RetryPolicyDecider<FC> instance; + try { + instance = clazz.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new LensException("Couldn't create instance of class " + clazz.getName(), e); + } + if (instance instanceof Configurable) { + ((Configurable) instance).setConf(conf); + } + retryPolicyDeciders.add(instance); + } + } + return new ChainedRetryPolicyDecider<>(retryPolicyDeciders); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java new file mode 100644 index 0000000..0f5ba26 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/DefaultRetryPolicyDecider.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + +public class DefaultRetryPolicyDecider<FC extends FailureContext> implements RetryPolicyDecider<FC> { + + @Override + public BackOffRetryHandler<FC> decidePolicy(String errorMessage) { + return new NoRetryHandler<>(); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java new file mode 100644 index 0000000..24e3d5a --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FailureContext.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + +/** + * Any data structure that has fail and retry capability should deal with an implementation of this interface. + * @see org.apache.lens.server.api.query.StatusUpdateFailureContext + * @see org.apache.lens.server.api.query.QueryContext + */ +public interface FailureContext { + /** + * + * @return Last time of failure + */ + long getLastFailedTime(); + + /** + * + * @return number of times failure has occured so far + */ + int getFailCount(); +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java new file mode 100644 index 0000000..01da25d --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/FibonacciExponentialBackOffRetryHandler.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lens.server.api.retry; + +import static com.google.common.base.Preconditions.checkArgument; + + +/** + * A exponential backoff retry handler. + * + * It allows the the failures to be retried at a next update time, which can increase exponentially. + * + */ +public class FibonacciExponentialBackOffRetryHandler<FC extends FailureContext> implements BackOffRetryHandler<FC> { + final int[] fibonacci; + final long maxDelay; + final long waitMillis; + + public FibonacciExponentialBackOffRetryHandler(int numRetries, long maxDelay, long waitMillis) { + checkArgument(numRetries > 2); + fibonacci = new int[numRetries]; + fibonacci[0] = 1; + fibonacci[1] = 1; + for(int i = 2; i < numRetries; ++i) { + fibonacci[i] = fibonacci[i-1] + fibonacci[i-2]; + } + this.maxDelay = maxDelay; + this.waitMillis = waitMillis; + } + + public boolean canTryOpNow(FC failContext) { + synchronized (failContext) { + if (failContext.getFailCount() != 0) { + long now = System.currentTimeMillis(); + if (now < getOperationNextTime(failContext)) { + return false; + } + } + return true; + } + } + + public long getOperationNextTime(FC failContext) { + synchronized (failContext) { + if (failContext.getFailCount() >= fibonacci.length) { + return failContext.getLastFailedTime() + maxDelay; + } + long delay = Math.min(maxDelay, fibonacci[failContext.getFailCount()] * waitMillis); + return failContext.getLastFailedTime() + delay; + } + } + + public boolean hasExhaustedRetries(FC failContext) { + synchronized (failContext) { + if (failContext.getFailCount() >= fibonacci.length) { + return true; + } + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java new file mode 100644 index 0000000..c1c0126 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/ImmediateRetryHandler.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +public class ImmediateRetryHandler<FC extends FailureContext> implements BackOffRetryHandler<FC> { + private final int retries; + private int retriesDone = 0; + // default 1 retry + public ImmediateRetryHandler() { + this(1); + } + + @Override + public boolean canTryOpNow(FC failContext) { + return true; + } + + @Override + public long getOperationNextTime(FC failContext) { + return System.currentTimeMillis(); + } + + @Override + public boolean hasExhaustedRetries(FC failContext) { + return ++retriesDone > retries; + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java new file mode 100644 index 0000000..df68a48 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/NoRetryHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + +public class NoRetryHandler<FC extends FailureContext> extends ImmediateRetryHandler<FC> { + NoRetryHandler() { + super(0); + } + + @Override + public boolean canTryOpNow(FC failContext) { + return false; + } + + @Override + public long getOperationNextTime(FC failContext) { + return Long.MAX_VALUE; + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java new file mode 100644 index 0000000..35fdaca --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/OperationRetryHandlerFactory.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.lens.server.api.retry; + +/** + * Factory which creates operation retry handler + */ +public class OperationRetryHandlerFactory { + private OperationRetryHandlerFactory() { + } + + /** + * Create exponential backoff handler + * + * @param numRetries Number of exponential backoff retries + * @param maxDelay Maximum delay an operation can wait for next + * @param waitMillis Number of millis that would grow exponentially incase of failures + * + * @return BackOffRetryHandler + */ + public static <FC extends FailureContext> BackOffRetryHandler<FC> createExponentialBackOffHandler(int numRetries, + long maxDelay, long waitMillis) { + return new FibonacciExponentialBackOffRetryHandler<>(numRetries, maxDelay, waitMillis); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java new file mode 100644 index 0000000..88448b0 --- /dev/null +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/retry/RetryPolicyDecider.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + + +public interface RetryPolicyDecider<FC extends FailureContext> { + + BackOffRetryHandler<FC> decidePolicy(String errorMessage); +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java b/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java index 8261d8a..9d732c1 100644 --- a/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java +++ b/lens-server-api/src/main/java/org/apache/lens/server/api/util/LensUtil.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.lens.server.api.common.ConfigBasedObjectCreationFactory; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import com.google.common.collect.ImmutableSet; @@ -69,26 +70,23 @@ public final class LensUtil { public static boolean isSocketException(@NonNull Throwable e) { Throwable cause = getCause(e); - if (cause instanceof SocketException || cause instanceof SocketTimeoutException) { - return true; - } - return false; + return cause instanceof SocketException || cause instanceof SocketTimeoutException; } - public static <T> ImmutableSet<T> getImplementations(final String factoriesKey, final Configuration conf) { - + public static <T> Set<T> getImplementationsMutable(final String factoriesKey, final Configuration conf) { Set<T> implSet = Sets.newLinkedHashSet(); final String[] factoryNames = conf.getStrings(factoriesKey); - - if (factoryNames == null) { - return ImmutableSet.copyOf(implSet); - } - - for (String factoryName : factoryNames) { - if (StringUtils.isNotBlank(factoryName)) { - final T implementation = getImplementation(factoryName.trim(), conf); - implSet.add(implementation); + if (factoryNames != null) { + for (String factoryName : factoryNames) { + if (StringUtils.isNotBlank(factoryName)) { + final T implementation = getImplementation(factoryName.trim(), conf); + implSet.add(implementation); + } } } + return implSet; + } + public static <T> ImmutableSet<T> getImplementations(final String factoriesKey, final Configuration conf) { + Set<T> implSet = getImplementationsMutable(factoriesKey, conf); return ImmutableSet.copyOf(implSet); } @@ -97,7 +95,11 @@ public final class LensUtil { try { ConfigBasedObjectCreationFactory<T> factory = (ConfigBasedObjectCreationFactory<T>) Class.forName(factoryName).newInstance(); - return factory.create(conf); + T ret = factory.create(conf); + if (ret instanceof Configurable) { + ((Configurable) ret).setConf(conf); + } + return ret; } catch (final ReflectiveOperationException e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java b/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java deleted file mode 100644 index 5f407af..0000000 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/common/TestExponentialBackOffRetryHandler.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.api.common; - -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - -import org.testng.annotations.Test; - -public class TestExponentialBackOffRetryHandler { - - @Test - public void testExponentialBackOff() { - FailureContext failures = new FailureContext(); - BackOffRetryHandler retryHandler = OperationRetryHandlerFactory.createExponentialBackOffHandler(10, 10000, 1000); - assertFalse(retryHandler.hasExhaustedRetries(failures)); - assertTrue(retryHandler.canTryOpNow(failures)); - - long now = System.currentTimeMillis(); - failures.updateFailure(); - assertFalse(retryHandler.hasExhaustedRetries(failures)); - assertFalse(retryHandler.canTryOpNow(failures)); - assertTrue(now + 500 < retryHandler.getOperationNextTime(failures)); - assertTrue(now + 15000 > retryHandler.getOperationNextTime(failures)); - - for (int i = 0; i < 10; i++) { - failures.updateFailure(); - } - assertTrue(retryHandler.hasExhaustedRetries(failures)); - assertFalse(retryHandler.canTryOpNow(failures)); - - failures.clear(); - assertFalse(retryHandler.hasExhaustedRetries(failures)); - assertTrue(retryHandler.canTryOpNow(failures)); - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java index 3c18ac7..2a2963f 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/driver/MockDriver.java @@ -58,7 +58,7 @@ public class MockDriver extends AbstractLensDriver { /** * The conf. */ - Configuration conf; + protected Configuration conf; /** * The query. @@ -99,7 +99,7 @@ public class MockDriver extends AbstractLensDriver { this.conf = conf; ioTestVal = conf.getInt("mock.driver.test.val", -1); this.conf.addResource(getDriverResourcePath("failing-query-driver-site.xml")); - getQueryHook().setDriver(this); + loadQueryHook(); } @Override http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java index a530e9d..1560bf1 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/TestQueryContext.java @@ -24,10 +24,11 @@ import static org.testng.Assert.*; import java.util.List; import org.apache.lens.api.LensConf; -import org.apache.lens.server.api.common.*; import org.apache.lens.server.api.driver.LensDriver; import org.apache.lens.server.api.driver.MockDriver; import org.apache.lens.server.api.error.LensException; +import org.apache.lens.server.api.retry.BackOffRetryHandler; +import org.apache.lens.server.api.retry.FibonacciExponentialBackOffRetryHandler; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java new file mode 100644 index 0000000..cc58751 --- /dev/null +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/comparators/ChainedComparatorTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.query.comparators; + +import static org.testng.Assert.*; + +import java.util.Comparator; + +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import com.google.common.collect.Lists; +import lombok.Data; + +public class ChainedComparatorTest { + @Data + private static class Tuple { + final Integer a, b, c; + } + + private Tuple tuple(Integer a, Integer b, Integer c) { + return new Tuple(a, b, c); + } + + public static final ChainedComparator<Tuple> COMPARATOR = new ChainedComparator<>(Lists.newArrayList( + new Comparator<Tuple>() { + @Override + public int compare(Tuple o1, Tuple o2) { + return o1.getA().compareTo(o2.getA()); + } + }, + new Comparator<Tuple>() { + + @Override + public int compare(Tuple o1, Tuple o2) { + return o1.getB().compareTo(o2.getB()); + } + }, + new Comparator<Tuple>() { + @Override + public int compare(Tuple o1, Tuple o2) { + return o1.getC().compareTo(o2.getC()); + } + } + )); + + @DataProvider + public Object[][] comparisonData() { + return new Object[][]{ + {tuple(0, 0, 0), tuple(0, 0, 0), 0}, + {tuple(0, 0, 1), tuple(0, 0, 0), 1}, + {tuple(0, 0, 1), tuple(0, 0, 4), -1}, + {tuple(0, 0, 1), tuple(1, 0, 4), -1}, + {tuple(0, 0, 1), tuple(0, -10, 4), 1}, + }; + } + + @Test(dataProvider = "comparisonData") + public void testCompare(Tuple a, Tuple b, int expected) throws Exception { + assertEquals(COMPARATOR.compare(a, b), expected); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java b/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java index 122409b..2667ebf 100644 --- a/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/query/constraint/MaxConcurrentDriverQueriesConstraintTest.java @@ -24,7 +24,8 @@ import static org.apache.lens.server.api.LensServerAPITestUtil.getConfiguration; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; import java.util.HashSet; import java.util.Set; @@ -164,9 +165,13 @@ public class MaxConcurrentDriverQueriesConstraintTest { when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(currentDriverLaunchedQueries); - boolean actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + String actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); - assertEquals(actualCanLaunch, expectedCanLaunch); + if (expectedCanLaunch) { + assertNull(actualCanLaunch); + } else { + assertNotNull(actualCanLaunch); + } } @Test(dataProvider = "dpTestConcurrentLaunches") @@ -186,9 +191,13 @@ public class MaxConcurrentDriverQueriesConstraintTest { when(mockLaunchedQueries.getQueriesCount(mockDriver)).thenReturn(currentDriverLaunchedQueries); when(mockLaunchedQueries.getQueries(mockDriver)).thenReturn(queries); - boolean actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + String actualCanLaunch = constraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); - assertEquals(actualCanLaunch, expectedCanLaunch); + if (expectedCanLaunch) { + assertNull(actualCanLaunch); + } else { + assertNotNull(actualCanLaunch); + } } @Test(dataProvider = "dpTestPerQueueConstraints") @@ -208,9 +217,13 @@ public class MaxConcurrentDriverQueriesConstraintTest { QueryContext mockCandidateQuery = mock(QueryContext.class); when(mockCandidateQuery.getQueue()).thenReturn(candidateQueue); when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); - boolean actualCanLaunch = perQueueConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + String actualCanLaunch = perQueueConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); - assertEquals(actualCanLaunch, expectedCanLaunch); + if (expectedCanLaunch) { + assertNull(actualCanLaunch); + } else { + assertNotNull(actualCanLaunch); + } } @Test(dataProvider = "dpTestPerPriorityConstraints") @@ -230,9 +243,13 @@ public class MaxConcurrentDriverQueriesConstraintTest { QueryContext mockCandidateQuery = mock(QueryContext.class); when(mockCandidateQuery.getPriority()).thenReturn(candidatePriority); when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); - boolean actualCanLaunch = perPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + String actualCanLaunch = perPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); - assertEquals(actualCanLaunch, expectedCanLaunch); + if (expectedCanLaunch) { + assertNull(actualCanLaunch); + } else { + assertNotNull(actualCanLaunch); + } } @Test(dataProvider = "dpTestPerQueuePerPriorityConstraints") @@ -254,8 +271,12 @@ public class MaxConcurrentDriverQueriesConstraintTest { when(mockCandidateQuery.getQueue()).thenReturn(candidateQueue); when(mockCandidateQuery.getPriority()).thenReturn(candidatePriority); when(mockCandidateQuery.getSelectedDriver()).thenReturn(mockDriver); - boolean actualCanLaunch = perQueueAndPerPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); + String actualCanLaunch = perQueueAndPerPriorityConstraint.allowsLaunchOf(mockCandidateQuery, mockLaunchedQueries); - assertEquals(actualCanLaunch, expectedCanLaunch); + if (expectedCanLaunch) { + assertNull(actualCanLaunch); + } else { + assertNotNull(actualCanLaunch); + } } } http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java ---------------------------------------------------------------------- diff --git a/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java b/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java new file mode 100644 index 0000000..26261dd --- /dev/null +++ b/lens-server-api/src/test/java/org/apache/lens/server/api/retry/TestExponentialBackOffRetryHandler.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.api.retry; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import org.apache.lens.server.api.query.StatusUpdateFailureContext; + +import org.testng.annotations.Test; + +public class TestExponentialBackOffRetryHandler { + + @Test + public void testExponentialBackOff() { + StatusUpdateFailureContext failures = new StatusUpdateFailureContext(); + BackOffRetryHandler<StatusUpdateFailureContext> retryHandler + = OperationRetryHandlerFactory.createExponentialBackOffHandler(10, 10000, 1000); + assertFalse(retryHandler.hasExhaustedRetries(failures)); + assertTrue(retryHandler.canTryOpNow(failures)); + + long now = System.currentTimeMillis(); + failures.updateFailure(); + assertFalse(retryHandler.hasExhaustedRetries(failures)); + assertFalse(retryHandler.canTryOpNow(failures)); + assertTrue(now + 500 < retryHandler.getOperationNextTime(failures)); + assertTrue(now + 15000 > retryHandler.getOperationNextTime(failures)); + + for (int i = 0; i < 10; i++) { + failures.updateFailure(); + } + assertTrue(retryHandler.hasExhaustedRetries(failures)); + assertFalse(retryHandler.canTryOpNow(failures)); + + failures.clear(); + assertFalse(retryHandler.hasExhaustedRetries(failures)); + assertTrue(retryHandler.canTryOpNow(failures)); + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/pom.xml ---------------------------------------------------------------------- diff --git a/lens-server/pom.xml b/lens-server/pom.xml index 6dea9a7..d24dc1e 100644 --- a/lens-server/pom.xml +++ b/lens-server/pom.xml @@ -372,7 +372,7 @@ <environmentVariables> <MVN_CLASSPATH_FILE>${mvn.classpath.file}</MVN_CLASSPATH_FILE> </environmentVariables> - <argLine>-Xms256m -Xmx512m -XX:PermSize=256m -XX:MaxPermSize=256m</argLine> + <argLine>-Xms256m -Xmx1024m -XX:PermSize=256m -XX:MaxPermSize=256m</argLine> </configuration> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java index b88c717..9f14396 100644 --- a/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/metrics/MetricsServiceImpl.java @@ -39,7 +39,7 @@ import org.apache.lens.server.api.health.HealthStatus; import org.apache.lens.server.api.metastore.CubeMetastoreService; import org.apache.lens.server.api.metrics.*; import org.apache.lens.server.api.query.QueryExecutionService; -import org.apache.lens.server.api.query.StatusChange; +import org.apache.lens.server.api.query.events.StatusChange; import org.apache.lens.server.api.session.*; import org.apache.lens.server.healthcheck.LensServiceHealthCheck; import org.apache.lens.server.query.QueryExecutionServiceImpl; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java deleted file mode 100644 index 75c1146..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/query/FIFOQueryComparator.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.query; - -import org.apache.lens.server.api.query.QueryContext; - -public class FIFOQueryComparator implements QueryComparator { - - @Override - public int compare(QueryContext o1, QueryContext o2) { - - Long submitTimeO1 = o1.getSubmissionTime(); - Long submitTimeO2 = o2.getSubmissionTime(); - - return submitTimeO1.compareTo(submitTimeO2); - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java index a540c3c..3ab3aef 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/LensServerDAO.java @@ -28,6 +28,7 @@ import java.util.List; import javax.sql.DataSource; import org.apache.lens.api.LensConf; +import org.apache.lens.api.query.FailedAttempt; import org.apache.lens.api.query.QueryHandle; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.server.api.error.LensException; @@ -35,15 +36,12 @@ import org.apache.lens.server.api.query.FinishedLensQuery; import org.apache.lens.server.util.UtilityMethods; import org.apache.commons.codec.binary.Base64; -import org.apache.commons.dbutils.BasicRowProcessor; -import org.apache.commons.dbutils.BeanProcessor; -import org.apache.commons.dbutils.QueryRunner; -import org.apache.commons.dbutils.ResultSetHandler; -import org.apache.commons.dbutils.RowProcessor; +import org.apache.commons.dbutils.*; import org.apache.commons.dbutils.handlers.BeanHandler; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; /** @@ -93,7 +91,7 @@ public class LensServerDAO { + "metadata varchar(100000), " + "rows int, " + "filesize bigint, " + "errormessage varchar(10000), " + "driverstarttime bigint, " + "driverendtime bigint, " + "drivername varchar(10000), " + "queryname varchar(255), " + "submissiontime bigint, " + "driverquery varchar(1000000), " - + "conf varchar(100000))"; + + "conf varchar(100000), numfailedattempts int)"; try { QueryRunner runner = new QueryRunner(ds); runner.update(sql); @@ -102,6 +100,18 @@ public class LensServerDAO { log.warn("Unable to create finished queries table", e); } } + public void createFailedAttemptsTable() throws Exception { + String sql = "CREATE TABLE if not exists failed_attempts (handle varchar(255) not null," + + "attempt_number int, drivername varchar(10000), progress float, progressmessage varchar(10000), " + + "errormessage varchar(10000), driverstarttime bigint, driverendtime bigint)"; + try { + QueryRunner runner = new QueryRunner(ds); + runner.update(sql); + log.info("Created failed_attempts table"); + } catch (SQLException e) { + log.error("Unable to create failed_attempts table", e); + } + } /** * DAO method to insert a new Finished query into Table. @@ -115,14 +125,26 @@ public class LensServerDAO { // The expected case String sql = "insert into finished_queries (handle, userquery, submitter, priority, " + "starttime,endtime,result,status,metadata,rows,filesize," - + "errormessage,driverstarttime,driverendtime, drivername, queryname, submissiontime, driverquery, conf)" - + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; - QueryRunner runner = new QueryRunner(ds); - runner.update(sql, query.getHandle(), query.getUserQuery(), query.getSubmitter(), query.getPriority(), - query.getStartTime(), query.getEndTime(), query.getResult(), query.getStatus(), query.getMetadata(), - query.getRows(), query.getFileSize(), query.getErrorMessage(), query.getDriverStartTime(), - query.getDriverEndTime(), query.getDriverName(), query.getQueryName(), query.getSubmissionTime(), - query.getDriverQuery(), serializeConf(query.getConf())); + + "errormessage,driverstarttime,driverendtime, drivername, queryname, submissiontime, driverquery, conf, " + + "numfailedattempts)" + + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + Connection conn = null; + try { + conn = getConnection(); + conn.setAutoCommit(false); + QueryRunner runner = new QueryRunner(); + runner.update(conn, sql, query.getHandle(), query.getUserQuery(), query.getSubmitter(), query.getPriority(), + query.getStartTime(), query.getEndTime(), query.getResult(), query.getStatus(), query.getMetadata(), + query.getRows(), query.getFileSize(), query.getErrorMessage(), query.getDriverStartTime(), + query.getDriverEndTime(), query.getDriverName(), query.getQueryName(), query.getSubmissionTime(), + query.getDriverQuery(), serializeConf(query.getConf()), query.getFailedAttempts().size()); + for (int i = 0; i < query.getFailedAttempts().size(); i++) { + insertFailedAttempt(runner, conn, query.getHandle(), query.getFailedAttempts().get(i), i); + } + conn.commit(); + } finally { + DbUtils.closeQuietly(conn); + } } else { log.warn("Re insert happening in purge: " + Thread.currentThread().getStackTrace()); if (alreadyExisting.equals(query)) { @@ -135,6 +157,50 @@ public class LensServerDAO { } } } + /** + * DAO method to insert a new Finished query into Table. + * + * + * @param runner + * @param conn + *@param handle to be inserted + * @param index @throws SQLException the exception + */ + public void insertFailedAttempt(QueryRunner runner, Connection conn, String handle, FailedAttempt attempt, int index) + throws SQLException { + String sql = "insert into failed_attempts(handle, attempt_number, drivername, progress, progressmessage, " + + "errormessage, driverstarttime, driverendtime) values (?, ?, ?, ?, ?, ?, ?, ?)"; + runner.update(conn, sql, handle, index, attempt.getDriverName(), + attempt.getProgress(), attempt.getProgressMessage(), attempt.getErrorMessage(), + attempt.getDriverStartTime(), attempt.getDriverFinishTime()); + } + + public void getFailedAttempts(final FinishedLensQuery query) { + if (query != null) { + String handle = query.getHandle(); + ResultSetHandler<List<FailedAttempt>> rsh = new BeanHandler<List<FailedAttempt>>(null) { + @Override + public List<FailedAttempt> handle(ResultSet rs) throws SQLException { + List<FailedAttempt> attempts = Lists.newArrayList(); + while (rs.next()) { + FailedAttempt attempt = new FailedAttempt(rs.getString(3), rs.getDouble(4), rs.getString(5), + rs.getString(6), rs.getLong(7), rs.getLong(8)); + attempts.add(attempt); + } + return attempts; + } + }; + String sql = "select * from failed_attempts where handle=? order by attempt_number"; + QueryRunner runner = new QueryRunner(ds); + try { + query.setFailedAttempts(runner.query(sql, rsh, handle)); + } catch (SQLException e) { + log.error("SQL exception while executing query.", e); + } + } + } + + private String serializeConf(LensConf conf) { return Base64.encodeBase64String(conf.toXMLString().getBytes(Charset.defaultCharset())); @@ -157,7 +223,9 @@ public class LensServerDAO { String sql = "select * from finished_queries where handle=?"; QueryRunner runner = new QueryRunner(ds); try { - return runner.query(sql, rsh, handle); + FinishedLensQuery finishedQuery = runner.query(sql, rsh, handle); + getFailedAttempts(finishedQuery); + return finishedQuery; } catch (SQLException e) { log.error("SQL exception while executing query.", e); } http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java deleted file mode 100644 index 67dda6b..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryComparator.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package org.apache.lens.server.query; - -import java.util.Comparator; - -import org.apache.lens.server.api.query.QueryContext; - -public interface QueryComparator extends Comparator<QueryContext> { -} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java deleted file mode 100644 index 2702581..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryCostComparator.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.query; - -import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.cost.QueryCost; - -public class QueryCostComparator extends FIFOQueryComparator { - - @Override - public int compare(final QueryContext o1, final QueryContext o2) { - - QueryCost qcO1 = o1.getSelectedDriverQueryCost(); - QueryCost qcO2 = o2.getSelectedDriverQueryCost(); - - int result = qcO1.compareTo(qcO2); - if (result == 0) { - return super.compare(o1, o2); - } - return result; - } -} - http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java index 5d2ddbe..e932672 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndHttpNotifier.java @@ -24,8 +24,8 @@ import java.util.Map; import org.apache.lens.api.query.QueryStatus; import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.QueryEnded; -import org.apache.lens.server.api.query.QueryEvent; +import org.apache.lens.server.api.query.events.QueryEnded; +import org.apache.lens.server.api.query.events.QueryEvent; import org.apache.lens.server.model.LogSegregationContext; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java index 91fddc9..2a34c68 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEndNotifier.java @@ -40,7 +40,7 @@ import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.QueryEnded; +import org.apache.lens.server.api.query.events.QueryEnded; import org.apache.lens.server.model.LogSegregationContext; import org.apache.commons.lang3.StringUtils; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java index f264603..1760bec 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryEventHttpNotifier.java @@ -36,8 +36,8 @@ import org.apache.lens.api.util.MoxyJsonConfigurationContextResolver; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.QueryEnded; -import org.apache.lens.server.api.query.QueryEvent; +import org.apache.lens.server.api.query.events.QueryEnded; +import org.apache.lens.server.api.query.events.QueryEvent; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java index cb5961f..b5e996f 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionServiceImpl.java @@ -50,8 +50,6 @@ import org.apache.lens.server.BaseLensService; import org.apache.lens.server.LensServerConf; import org.apache.lens.server.LensServices; import org.apache.lens.server.api.LensConfConstants; -import org.apache.lens.server.api.common.BackOffRetryHandler; -import org.apache.lens.server.api.common.OperationRetryHandlerFactory; import org.apache.lens.server.api.driver.*; import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.error.LensMultiCauseException; @@ -62,8 +60,11 @@ import org.apache.lens.server.api.metrics.MethodMetricsFactory; import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.api.query.*; import org.apache.lens.server.api.query.collect.WaitingQueriesSelectionPolicy; +import org.apache.lens.server.api.query.comparators.*; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; import org.apache.lens.server.api.query.cost.QueryCost; +import org.apache.lens.server.api.query.events.*; +import org.apache.lens.server.api.retry.*; import org.apache.lens.server.api.util.LensUtil; import org.apache.lens.server.model.LogSegregationContext; import org.apache.lens.server.model.MappedDiagnosticLogSegregationContext; @@ -235,7 +236,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE /** * The query comparator */ - private QueryComparator queryComparator; + private Comparator<QueryContext> queryComparator; /** * The result sets. */ @@ -295,7 +296,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * */ private final ReentrantLock removalFromLaunchedQueriesLock = new ReentrantLock(); - private final ExecutorService waitingQueriesSelectionSvc = Executors.newSingleThreadExecutor(); + private final ScheduledExecutorService waitingQueriesSelectionSvc = Executors.newSingleThreadScheduledExecutor(); /** * This is the TTL millis for all result sets of type {@link org.apache.lens.server.api.driver.InMemoryResultSet} @@ -321,7 +322,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private UserQueryToCubeQueryRewriter userQueryToCubeQueryRewriter; // Exponential backoff retry handler for status updates - private BackOffRetryHandler statusUpdateRetryHandler; + private BackOffRetryHandler<StatusUpdateFailureContext> statusUpdateRetryHandler; + private RetryPolicyDecider<QueryContext> queryRetryPolicyDecider; /** * Instantiates a new query execution service impl. @@ -376,9 +378,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * * @throws LensException the lens exception */ - private void loadDriversAndSelector() throws LensException { - //Load all configured Drivers - loadDrivers(); + private void loadDriverSelector() throws LensException { //Load configured Driver Selector try { Class<? extends DriverSelector> driverSelectorClass = conf.getClass(DRIVER_SELECTOR_CLASS, @@ -394,14 +394,18 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE } private void loadQueryComparator() throws LensException { try { - Class<? extends QueryComparator> queryComparatorClass = conf.getClass(QUERY_COMPARATOR_CLASS, - QueryPriorityComparator.class, QueryComparator.class); - log.info("Using query comparator class: {}", queryComparatorClass.getCanonicalName()); - queryComparator = queryComparatorClass.newInstance(); + Class<?>[] classes = conf.getClasses(QUERY_COMPARATOR_CLASSES, + MoreRetriesFirstComparator.class, QueryPriorityComparator.class, + FIFOQueryComparator.class, QueryCostComparator.class); + List<Comparator<QueryContext>> comparators = Lists.newArrayList(); + for (Class<?> clazz: classes) { + comparators.add(clazz.asSubclass(QueryComparator.class).newInstance()); + } + queryComparator = new ChainedComparator<>(comparators); } catch (Exception e) { - throw new LensException("Couldn't instantiate query comparator class. Class name: " - + conf.get(QUERY_COMPARATOR_CLASS) + ". Please supply a valid value for " - + QUERY_COMPARATOR_CLASS); + throw new LensException("Couldn't instantiate query comparator class. Classes: " + + conf.get(QUERY_COMPARATOR_CLASSES) + ". Please supply a valid value for " + + QUERY_COMPARATOR_CLASSES); } } @@ -681,9 +685,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE Thread.sleep(100); continue; } - QueryContext query = queuedQueries.take(); + final QueryContext query = queuedQueries.take(); synchronized (query) { - /* Setting log segregation id */ logSegregationContext.setLogSegragationAndQueryId(query.getQueryHandleString()); @@ -714,6 +717,17 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE launched queries. First add to waiting queries, then release lock */ addToWaitingQueries(query); removalFromLaunchedQueriesLock.unlock(); + if (query.getRetryPolicy() != null) { + waitingQueriesSelectionSvc.schedule(new Runnable() { + @Override + public void run() { + if (waitingQueries.remove(query)) { + queuedQueries.add(query); + } + } + }, query.getRetryPolicy().getOperationNextTime(query) - System.currentTimeMillis(), + TimeUnit.MILLISECONDS); + } } } finally { if (removalFromLaunchedQueriesLock.isHeldByCurrentThread()) { @@ -881,6 +895,43 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.info("StatusPoller exited"); } } + private boolean handleRetries(QueryContext ctx) throws LensException { + // TODO: handle retries for post-processing, e.g. result formatting failure doesn't need query rerun + if (ctx.getStatus().failing()) { + if (removeFromLaunchedQueries(ctx)) { + processWaitingQueriesAsync(ctx); + } + if (ctx.getDriverStatus().failed() && !getDriverRetryPolicy(ctx).hasExhaustedRetries(ctx)) { + log.info("query {} will be retried on the same driver {}", + ctx.getQueryHandle(), ctx.getSelectedDriver().getFullyQualifiedName()); + ctx.extractFailedAttempt(); + ctx.setStatus(QueryStatus.getQueuedStatus()); + ctx.getSelectedDriver().closeQuery(ctx.getQueryHandle()); + return queuedQueries.add(ctx); + } else if (!getServerRetryPolicy(ctx).hasExhaustedRetries(ctx)) { + LensDriver selectedDriver = ctx.getSelectedDriver(); + ctx.getDriverContext().blacklist(selectedDriver); + try (SessionContext ignored = new SessionContext(getSessionHandle(ctx.getLensSessionIdentifier()))) { + rewriteAndSelect(ctx); + } catch (LensException e) { + log.error("driver {} gave up on query {} and it will not be retried on any other driver since rewrite failed", + selectedDriver.getFullyQualifiedName(), e); + ctx.setStatus(new QueryStatus(1.0f, null, FAILED, ctx.getStatus().getStatusMessage(), false, null, + ctx.getStatus().getErrorMessage(), ctx.getStatus().getLensErrorTO())); + return false; + } + log.info("driver {} gave up on query {} and it will be retried on {}", selectedDriver.getFullyQualifiedName(), + ctx.getQueryHandle(), ctx.getSelectedDriver().getFullyQualifiedName()); + ctx.extractFailedAttempt(selectedDriver); + ctx.setStatus(QueryStatus.getQueuedStatus()); + selectedDriver.closeQuery(ctx.getQueryHandle()); + return queuedQueries.add(ctx); + } + ctx.setStatus(new QueryStatus(1.0f, null, FAILED, ctx.getStatus().getStatusMessage(), false, null, + ctx.getStatus().getErrorMessage(), ctx.getStatus().getLensErrorTO())); + } + return false; + } /** * Sets the failed status. @@ -891,13 +942,33 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE * @throws LensException the lens exception */ void setFailedStatus(QueryContext ctx, String statusMsg, Exception e) throws LensException { - QueryStatus before = ctx.getStatus(); - ctx.setStatus(new QueryStatus(0.0f, null, FAILED, statusMsg, false, null, LensUtil.getCauseMessage(e), + ctx.setStatus(new QueryStatus(0.0f, null, FAILING, statusMsg, false, null, LensUtil.getCauseMessage(e), e instanceof LensException ? ((LensException)e).buildLensErrorTO(this.errorCollection) : null)); - updateFinishedQuery(ctx, before); + handleRetries(ctx); + if (ctx.finished()) { + updateFinishedQuery(ctx, before); + } fireStatusChangeEvent(ctx, ctx.getStatus(), before); } + + private BackOffRetryHandler<QueryContext> getServerRetryPolicy(QueryContext ctx) { + if (ctx.getServerRetryPolicy() == null) { + // allow new driver to retry + ctx.setDriverRetryPolicy(null); + ctx.setServerRetryPolicy(queryRetryPolicyDecider.decidePolicy(ctx.getStatus().getErrorMessage())); + } + return ctx.getServerRetryPolicy(); + } + + private BackOffRetryHandler<QueryContext> getDriverRetryPolicy(QueryContext ctx) { + if (ctx.getDriverRetryPolicy() == null) { + ctx.setDriverRetryPolicy(ctx.getSelectedDriver().getRetryPolicyDecider() + .decidePolicy(ctx.getDriverStatus().getErrorMessage())); + } + return ctx.getDriverRetryPolicy(); + } + /** * Sets the cancelled status. * @@ -984,6 +1055,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE || !ctx.isResultAvailableInDriver())) { setSuccessState(ctx); } else { + if (ctx.getStatus().failing()) { + handleRetries(ctx); + } if (ctx.getStatus().finished()) { updateFinishedQuery(ctx, before); } @@ -1029,7 +1103,11 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE case LAUNCHED: return new QueryLaunched(ctx.getLaunchTime(), prevState, currState, query); case QUEUED: - return new QueryQueued(ctx.getSubmissionTime(), prevState, currState, query, ctx.getSubmittedUser()); + if (ctx.getFailedAttempts().size() > 0) { + return new QueryQueuedForRetry(ctx.getSubmissionTime(), prevState, currState, query, ctx.getSubmittedUser()); + } else { + return new QueryQueued(ctx.getSubmissionTime(), prevState, currState, query, ctx.getSubmittedUser()); + } case RUNNING: return new QueryRunning(System.currentTimeMillis() - ctx.getDriverStatus().getDriverStartTime(), prevState, currState, query); @@ -1239,13 +1317,21 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE throw new IllegalStateException("Could not load phase 1 rewriters"); } try { + loadQueryRetryPolicyDecider(conf); + } catch (LensException e) { + throw new IllegalStateException("Could not load retry policy", e); + } + try { initializeQueryAcceptors(); } catch (LensException e) { throw new IllegalStateException("Could not load acceptors"); } initializeListeners(); try { - loadDriversAndSelector(); + // Load all configured Drivers + loadDrivers(); + // load driver selector + loadDriverSelector(); } catch (LensException e) { log.error("Error while loading drivers", e); throw new IllegalStateException("Could not load drivers", e); @@ -1269,6 +1355,10 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.info("Query execution service initialized"); } + private void loadQueryRetryPolicyDecider(Configuration conf) throws LensException { + this.queryRetryPolicyDecider = ChainedRetryPolicyDecider.from(conf, QUERY_RETRY_POLICY_CLASSES); + } + /** * Initalize finished query store. * @@ -1279,8 +1369,9 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE this.lensServerDao.init(conf); try { this.lensServerDao.createFinishedQueriesTable(); + this.lensServerDao.createFailedAttemptsTable(); } catch (Exception e) { - log.warn("Unable to create finished query table, query purger will not purge queries", e); + log.warn("Unable to create finished query tables, query purger will not purge queries", e); } } @@ -1429,15 +1520,16 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE log.info("Recovered {} queries", allQueries.size()); } super.start(); - querySubmitter.start(); - statusPoller.start(); - queryPurger.start(); - prepareQueryPurger.start(); startEstimatePool(); startLauncherPool(); startQueryCancellationPool(); + querySubmitter.start(); + statusPoller.start(); + queryPurger.start(); + prepareQueryPurger.start(); + if (conf.getBoolean(RESULTSET_PURGE_ENABLED, DEFAULT_RESULTSET_PURGE_ENABLED)) { queryResultPurger = new QueryResultPurger(); queryResultPurger.init(conf); @@ -1598,8 +1690,8 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE // Evaluate success of rewrite and estimate boolean succeededOnce = false; - List<String> failureCauses = new ArrayList<String>(numDrivers); - List<LensException> causes = new ArrayList<LensException>(numDrivers); + List<String> failureCauses = new ArrayList<>(numDrivers); + List<LensException> causes = new ArrayList<>(numDrivers); for (RewriteEstimateRunnable r : runnables) { if (r.isSucceeded()) { @@ -2087,7 +2179,7 @@ public class QueryExecutionServiceImpl extends BaseLensService implements QueryE private QueryHandle submitQuery(final QueryContext ctx) throws LensException { synchronized (ctx) { QueryStatus before = ctx.getStatus(); - ctx.setStatus(new QueryStatus(0.0, null, QUEUED, "Query is queued", false, null, null, null)); + ctx.setStatus(QueryStatus.getQueuedStatus()); queuedQueries.add(ctx); log.info("Added to Queued Queries:{}", ctx.getQueryHandleString()); allQueries.put(ctx.getQueryHandle(), ctx); http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java index 55cabe2..557daa2 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/QueryExecutionStatisticsGenerator.java @@ -24,7 +24,7 @@ import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.events.LensEventService; import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.QueryEnded; +import org.apache.lens.server.api.query.events.QueryEnded; import org.apache.lens.server.stats.event.query.QueryDriverStatistics; import org.apache.lens.server.stats.event.query.QueryExecutionStatistics; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java b/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java deleted file mode 100644 index 2c6d904..0000000 --- a/lens-server/src/main/java/org/apache/lens/server/query/QueryPriorityComparator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.server.query; - -import org.apache.lens.api.Priority; -import org.apache.lens.server.api.query.QueryContext; - -public class QueryPriorityComparator extends FIFOQueryComparator { - - @Override - public int compare(final QueryContext o1, final QueryContext o2) { - - Priority pO1 = o1.getPriority(); - Priority pO2 = o2.getPriority(); - - int result = pO1.compareTo(pO2); - if (result == 0) { - return super.compare(o1, o2); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java index 41cf33b..c7dc0e1 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/ResultFormatter.java @@ -28,6 +28,7 @@ import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.metrics.MetricsService; import org.apache.lens.server.api.query.*; +import org.apache.lens.server.api.query.events.QueryExecuted; import org.apache.lens.server.model.LogSegregationContext; import org.apache.hadoop.fs.FileSystem; http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java b/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java index 48291b9..cf117dc 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/constraint/DefaultQueryLaunchingConstraintsChecker.java @@ -19,11 +19,13 @@ package org.apache.lens.server.query.constraint; +import java.util.Collections; import java.util.Set; import org.apache.lens.server.api.query.QueryContext; import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollection; import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; +import org.apache.lens.server.api.retry.BackOffRetryHandler; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; @@ -44,7 +46,7 @@ public class DefaultQueryLaunchingConstraintsChecker implements QueryLaunchingCo private final ImmutableSet<QueryLaunchingConstraint> lensQueryConstraints; public DefaultQueryLaunchingConstraintsChecker( - @NonNull final ImmutableSet<QueryLaunchingConstraint> lensQueryConstraints) { + @NonNull final ImmutableSet<QueryLaunchingConstraint> lensQueryConstraints) { this.lensQueryConstraints = lensQueryConstraints; } @@ -54,8 +56,11 @@ public class DefaultQueryLaunchingConstraintsChecker implements QueryLaunchingCo Set<QueryLaunchingConstraint> allConstraints = prepareAllConstraints(candidateQuery); for (QueryLaunchingConstraint queryConstraint : allConstraints) { - if (!queryConstraint.allowsLaunchOf(candidateQuery, launchedQueries)) { - log.info("query {} not allowed to launch. Constraint failed: {}", candidateQuery, queryConstraint); + String launchRejectionMessage = queryConstraint.allowsLaunchOf(candidateQuery, launchedQueries); + if (launchRejectionMessage != null) { + log.info("query {} not allowed to launch. Constraint failed: {} with message: {}", + candidateQuery, queryConstraint, launchRejectionMessage); + candidateQuery.getStatus().setProgressMessage(launchRejectionMessage); return false; } } @@ -66,6 +71,12 @@ public class DefaultQueryLaunchingConstraintsChecker implements QueryLaunchingCo Set<QueryLaunchingConstraint> prepareAllConstraints(final QueryContext candidateQuery) { ImmutableSet<QueryLaunchingConstraint> driverConstraints = candidateQuery.getSelectedDriverQueryConstraints(); - return Sets.union(this.lensQueryConstraints, driverConstraints); + BackOffRetryHandler<QueryContext> retryPolicy = candidateQuery.getRetryPolicy(); + Sets.SetView<QueryLaunchingConstraint> constraints = Sets.union(this.lensQueryConstraints, driverConstraints); + if (retryPolicy == null) { + return constraints; + } else { + return Sets.union(Collections.singleton(new RetryPolicyToConstraingAdapter(retryPolicy)), constraints); + } } } http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java b/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java new file mode 100644 index 0000000..e0d6d80 --- /dev/null +++ b/lens-server/src/main/java/org/apache/lens/server/query/constraint/RetryPolicyToConstraingAdapter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.server.query.constraint; + + +import org.apache.lens.server.api.query.QueryContext; +import org.apache.lens.server.api.query.collect.EstimatedImmutableQueryCollection; +import org.apache.lens.server.api.query.constraint.QueryLaunchingConstraint; +import org.apache.lens.server.api.retry.BackOffRetryHandler; + +import lombok.Data; + +@Data +public class RetryPolicyToConstraingAdapter implements QueryLaunchingConstraint { + private final BackOffRetryHandler<QueryContext> constraint; + @Override + public String allowsLaunchOf(QueryContext candidateQuery, EstimatedImmutableQueryCollection launchedQueries) { + if (!constraint.canTryOpNow(candidateQuery)) { + return "Query will be automatically re-attempted in " + + (constraint.getOperationNextTime(candidateQuery) - System.currentTimeMillis())/1000 + " seconds"; + } + return null; + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java b/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java index 0a8d4c3..a7ee737 100644 --- a/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java +++ b/lens-server/src/main/java/org/apache/lens/server/query/constraint/TotalQueryCostCeilingConstraint.java @@ -57,18 +57,20 @@ public class TotalQueryCostCeilingConstraint implements QueryLaunchingConstraint * @return */ @Override - public boolean allowsLaunchOf( + public String allowsLaunchOf( final QueryContext candidateQuery, final EstimatedImmutableQueryCollection launchedQueries) { if (!totalQueryCostCeilingPerUser.isPresent()) { - return true; + return null; } final String currentUser = candidateQuery.getSubmittedUser(); QueryCost totalQueryCostForCurrentUser = launchedQueries.getTotalQueryCost(currentUser); - boolean canLaunch = (totalQueryCostForCurrentUser.compareTo(totalQueryCostCeilingPerUser.get()) <= 0); - log.debug("canLaunch:{}", canLaunch); - return canLaunch; + if (totalQueryCostForCurrentUser.compareTo(totalQueryCostCeilingPerUser.get()) > 0) { + return totalQueryCostForCurrentUser + "/" + totalQueryCostCeilingPerUser + " capacity utilized by " + + candidateQuery.getSubmittedUser(); + } + return null; } } http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java b/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java index 18c2f2c..b2e140b 100644 --- a/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java +++ b/lens-server/src/main/java/org/apache/lens/server/rewrite/RewriteUtil.java @@ -299,7 +299,7 @@ public final class RewriteUtil { Map<LensDriver, DriverRewriterRunnable> runnables = new LinkedHashMap<>(); List<RewriteUtil.CubeQueryInfo> cubeQueries = findCubePositions(replacedQuery, ctx.getHiveConf()); - for (LensDriver driver : ctx.getDriverContext().getDrivers()) { + for (LensDriver driver : ctx.getDriverContext().getEligibleDrivers()) { runnables.put(driver, new DriverRewriterRunnable(driver, ctx, cubeQueries, replacedQuery)); } http://git-wip-us.apache.org/repos/asf/lens/blob/38ab6c60/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java ---------------------------------------------------------------------- diff --git a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java index 4192134..1c642bd 100644 --- a/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java +++ b/lens-server/src/main/java/org/apache/lens/server/scheduler/SchedulerQueryEventListener.java @@ -25,7 +25,7 @@ import org.apache.lens.api.query.QueryStatus; import org.apache.lens.api.scheduler.*; import org.apache.lens.server.api.events.AsyncEventListener; import org.apache.lens.server.api.query.QueryContext; -import org.apache.lens.server.api.query.QueryEnded; +import org.apache.lens.server.api.query.events.QueryEnded; import lombok.extern.slf4j.Slf4j;
