http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java index 6eb8e16..d8495cc 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/client/impl/DefaultClient.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.client.impl; import com.datastax.driver.core.Cluster; @@ -24,13 +18,12 @@ import com.datastax.driver.core.Metadata; import com.datastax.driver.core.Session; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.base.Preconditions; -import org.apache.storm.cassandra.client.SimpleClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Closeable; import java.io.Serializable; import java.util.Set; +import org.apache.storm.cassandra.client.SimpleClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Simple class to wrap cassandra {@link com.datastax.driver.core.Cluster} instance.
http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java index b528223..0c1964e 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BaseBeanFactory.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.context; import java.util.Map; @@ -28,6 +22,7 @@ public abstract class BaseBeanFactory<T> implements BeanFactory<T> { protected WorkerCtx context; protected volatile T instance; + /** * {@inheritDoc} */ @@ -41,13 +36,15 @@ public abstract class BaseBeanFactory<T> implements BeanFactory<T> { */ @Override public synchronized T get(Map<String, Object> topoConf) { - if( instance != null) return instance; + if (instance != null) return instance; return instance = make(topoConf); } + /** * Return a new instance of T. */ protected abstract T make(final Map<String, Object> topoConf); + /** * {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java index b573b51..fb712a5 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/BeanFactory.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.context; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java index 15668d1..1a457f9 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/context/WorkerCtx.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.context; import java.io.Serializable; @@ -54,7 +48,7 @@ public class WorkerCtx implements Serializable { */ protected <T> BeanFactory<T> getBeanfactory(Class<T> clazz) { BeanFactory<T> factory = (BeanFactory<T>) this.componentCtx.get(clazz); - if( factory == null) throw new RuntimeException("Cannot resolve bean factory for class : " + clazz.getCanonicalName()); + if (factory == null) throw new RuntimeException("Cannot resolve bean factory for class : " + clazz.getCanonicalName()); factory.setStormContext(this); return factory; } @@ -64,7 +58,7 @@ public class WorkerCtx implements Serializable { * @return */ public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> topoConf) { - return getWorkerBean(clazz, topoConf,false); + return getWorkerBean(clazz, topoConf, false); } /** @@ -77,13 +71,13 @@ public class WorkerCtx implements Serializable { * @return a instance of type {@link T}. */ public <T, K, V> T getWorkerBean(Class<T> clazz, Map<K, V> topoConf, boolean force) { - if( force ) workerCtx.remove(clazz); - BeanFactory<T> factory = (BeanFactory<T>) this.workerCtx.get(clazz); - if( factory == null) { + if (force) workerCtx.remove(clazz); + BeanFactory<T> factory = (BeanFactory<T>) this.workerCtx.get(clazz); + if (factory == null) { BeanFactory<T> instance = getBeanfactory(clazz).newInstance(); workerCtx.putIfAbsent(clazz, instance); factory = (BeanFactory<T>) this.workerCtx.get(clazz); } - return factory.get((Map<String, Object>)topoConf); + return factory.get((Map<String, Object>) topoConf); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java index 63b81fe..53e6ee6 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutor.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.executor; import com.datastax.driver.core.ResultSet; @@ -27,10 +21,6 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.storm.topology.FailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -39,6 +29,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.storm.topology.FailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Service to asynchronously executes cassandra statements. @@ -69,7 +62,7 @@ public class AsyncExecutor<T> implements Serializable { * @param executorService The executor service responsible to execute handler. */ private AsyncExecutor(Session session, ExecutorService executorService, AsyncResultHandler<T> handler) { - this.session = session; + this.session = session; this.executorService = executorService; this.handler = handler; } @@ -86,11 +79,11 @@ public class AsyncExecutor<T> implements Serializable { List<SettableFuture<T>> settableFutures = new ArrayList<>(statements.size()); - for(Statement s : statements) + for (Statement s : statements) settableFutures.add(execAsync(s, input, AsyncResultHandler.NO_OP_HANDLER)); ListenableFuture<List<T>> allAsList = Futures.allAsList(settableFutures); - Futures.addCallback(allAsList, new FutureCallback<List<T>>(){ + Futures.addCallback(allAsList, new FutureCallback<List<T>>() { @Override public void onSuccess(List<T> inputs) { handler.success(input); @@ -147,7 +140,8 @@ public class AsyncExecutor<T> implements Serializable { * Asynchronously executes the specified select statements. Results will be passed to the {@link AsyncResultSetHandler} * once each query has succeed or failed. */ - public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Semaphore throttle, final AsyncResultSetHandler<T> handler) { + public SettableFuture<List<T>> execAsync(final List<Statement> statements, final List<T> inputs, Semaphore throttle, + final AsyncResultSetHandler<T> handler) { final SettableFuture<List<T>> settableFuture = SettableFuture.create(); if (inputs.size() == 0) { @@ -184,11 +178,10 @@ public class AsyncExecutor<T> implements Serializable { handler.failure(throwable, input); } catch (Throwable throwable2) { asyncContext.exception(throwable2); - } - finally { + } finally { asyncContext - .exception(throwable) - .release(); + .exception(throwable) + .release(); pending.decrementAndGet(); LOG.error(String.format("Failed to execute statement '%s' ", statement), throwable); } @@ -196,7 +189,7 @@ public class AsyncExecutor<T> implements Serializable { }, executorService); } catch (Throwable throwable) { asyncContext.exception(throwable) - .release(); + .release(); pending.decrementAndGet(); break; } @@ -207,12 +200,26 @@ public class AsyncExecutor<T> implements Serializable { return settableFuture; } + /** + * Returns the number of currently executed tasks which are not yet completed. + */ + public int getPendingTasksSize() { + return this.pending.intValue(); + } + + public void shutdown() { + if (!executorService.isShutdown()) { + LOG.info("shutting down async handler executor"); + this.executorService.shutdownNow(); + } + } + private static class AsyncContext<T> { private final List<T> inputs; private final SettableFuture<List<T>> future; - private final AtomicInteger latch; - private final List<Throwable> exceptions; - private final Semaphore throttle; + private final AtomicInteger latch; + private final List<Throwable> exceptions; + private final Semaphore throttle; public AsyncContext(List<T> inputs, Semaphore throttle, SettableFuture<List<T>> settableFuture) { this.inputs = inputs; @@ -238,8 +245,7 @@ public class AsyncExecutor<T> implements Serializable { if (remaining == 0) { if (exceptions.size() == 0) { future.set(inputs); - } - else { + } else { future.setException(new MultiFailedException(exceptions)); } @@ -254,20 +260,6 @@ public class AsyncExecutor<T> implements Serializable { } } - /** - * Returns the number of currently executed tasks which are not yet completed. - */ - public int getPendingTasksSize() { - return this.pending.intValue(); - } - - public void shutdown( ) { - if( ! executorService.isShutdown() ) { - LOG.info("shutting down async handler executor"); - this.executorService.shutdownNow(); - } - } - public static class MultiFailedException extends FailedException { private final List<Throwable> exceptions; @@ -280,12 +272,12 @@ public class AsyncExecutor<T> implements Serializable { int top5 = Math.min(exceptions.size(), 5); StringBuilder sb = new StringBuilder(); sb.append("First ") - .append(top5) - .append(" exceptions: ") - .append(System.lineSeparator()); + .append(top5) + .append(" exceptions: ") + .append(System.lineSeparator()); for (int i = 0; i < top5; i++) { sb.append(exceptions.get(i).getMessage()) - .append(System.lineSeparator()); + .append(System.lineSeparator()); } return sb.toString(); } @@ -295,13 +287,13 @@ public class AsyncExecutor<T> implements Serializable { StringBuilder sb = new StringBuilder(); sb.append(getMessage()) - .append(System.lineSeparator()) - .append("Multiple exceptions encountered: ") - .append(System.lineSeparator()); + .append(System.lineSeparator()) + .append("Multiple exceptions encountered: ") + .append(System.lineSeparator()); for (Throwable exception : exceptions) { sb.append(exception.toString()) - .append(System.lineSeparator()); + .append(System.lineSeparator()); } return super.toString(); http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java index f4b7277..62df53c 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncExecutorProvider.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.executor; import com.datastax.driver.core.Session; @@ -32,7 +26,7 @@ public class AsyncExecutorProvider { */ public static <T> AsyncExecutor getLocal(Session session, AsyncResultHandler<T> handler) { AsyncExecutor<T> executor = localAsyncExecutor.<T>get(); - if( executor == null ) { + if (executor == null) { localAsyncExecutor.set(executor = new AsyncExecutor<>(session, handler)); } return executor; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java index f827d45..2f8766e 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultHandler.java @@ -1,26 +1,19 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.cassandra.executor; -import org.apache.storm.task.OutputCollector; +package org.apache.storm.cassandra.executor; import java.io.Serializable; +import org.apache.storm.task.OutputCollector; /** * Default handler for batch asynchronous execution. @@ -57,8 +50,8 @@ public interface AsyncResultHandler<T> extends Serializable { * * @param inputs The input tuple proceed. */ - void success(T inputs) ; + void success(T inputs); void flush(OutputCollector collector); - + } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java index 8ccb400..7a2159d 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/AsyncResultSetHandler.java @@ -1,25 +1,18 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.executor; import com.datastax.driver.core.ResultSet; - import java.io.Serializable; /** @@ -53,6 +46,6 @@ public interface AsyncResultSetHandler<T> extends Serializable { * * @param inputs The input tuple proceed. */ - void success(T inputs, ResultSet resultSet) ; + void success(T inputs, ResultSet resultSet); } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java index 882aeb4..44721ad 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/ExecutionResultCollector.java @@ -1,29 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.executor; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; import com.google.common.collect.Lists; -import org.apache.storm.cassandra.ExecutionResultHandler; - import java.util.List; +import org.apache.storm.cassandra.ExecutionResultHandler; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; /** * This class is responsible to collect input tuples proceed. @@ -41,7 +34,7 @@ public interface ExecutionResultCollector { * @param input the input tuple. */ public SucceedCollector(Tuple input) { - this(Lists.newArrayList(input)); + this(Lists.newArrayList(input)); } /** @@ -58,8 +51,8 @@ public interface ExecutionResultCollector { */ @Override public void handle(OutputCollector collector, ExecutionResultHandler handler) { - for(Tuple t : inputs) handler.onQuerySuccess(collector, t); - for(Tuple t : inputs) collector.ack(t); + for (Tuple t : inputs) handler.onQuerySuccess(collector, t); + for (Tuple t : inputs) collector.ack(t); } } @@ -88,7 +81,8 @@ public interface ExecutionResultCollector { } /** - * Calls {@link ExecutionResultHandler#onThrowable(Throwable, org.apache.storm.task.OutputCollector, org.apache.storm.tuple.Tuple)} . + * Calls + * {@link ExecutionResultHandler#onThrowable(Throwable, org.apache.storm.task.OutputCollector, org.apache.storm.tuple.Tuple)} . */ @Override public void handle(OutputCollector collector, ExecutionResultHandler handler) { http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java index f7a8fcc..99a27ef 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/BatchAsyncResultHandler.java @@ -1,31 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.executor.impl; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.storm.cassandra.ExecutionResultHandler; import org.apache.storm.cassandra.executor.AsyncResultHandler; import org.apache.storm.cassandra.executor.ExecutionResultCollector; - -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; public class BatchAsyncResultHandler implements AsyncResultHandler<List<Tuple>> { @@ -51,6 +44,7 @@ public class BatchAsyncResultHandler implements AsyncResultHandler<List<Tuple>> public void failure(Throwable t, List<Tuple> input) { completed.offer(new ExecutionResultCollector.FailedCollector(input, t)); } + /** * This method is responsible for acknowledging specified inputs. * @@ -66,7 +60,7 @@ public class BatchAsyncResultHandler implements AsyncResultHandler<List<Tuple>> @Override public void flush(final OutputCollector collector) { ExecutionResultCollector poll; - while( (poll = completed.poll()) != null ) { + while ((poll = completed.poll()) != null) { poll.handle(collector, handler); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java index ac79543..d7cc19b 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/executor/impl/SingleAsyncResultHandler.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.executor.impl; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.storm.cassandra.ExecutionResultHandler; import org.apache.storm.cassandra.executor.AsyncResultHandler; import org.apache.storm.cassandra.executor.ExecutionResultCollector; - -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> { @@ -50,6 +43,7 @@ public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> { public void failure(Throwable t, Tuple input) { completed.offer(new ExecutionResultCollector.FailedCollector(input, t)); } + /** * This method is responsible for acknowledging specified inputs. * @@ -65,7 +59,7 @@ public class SingleAsyncResultHandler implements AsyncResultHandler<Tuple> { @Override public void flush(final OutputCollector collector) { ExecutionResultCollector poll; - while( (poll = completed.poll()) != null ) { + while ((poll = completed.poll()) != null) { poll.handle(collector, handler); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java index 9b92b99..f57a99c 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/AyncCQLResultSetValuesMapper.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.tuple.Values; - import java.io.Serializable; import java.util.List; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Values; /** * A resultset mapper that http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java index 2a8135d..9cbac15 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/BaseCQLStatementTupleMapper.java @@ -1,31 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query; -import org.apache.storm.tuple.ITuple; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; - import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.storm.tuple.ITuple; /** * Default interface to map a {@link org.apache.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}. http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java index d5495fb..2d50161 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLResultSetValuesMapper.java @@ -1,30 +1,23 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.tuple.Values; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; - import java.io.Serializable; import java.util.List; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Values; /** * http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java index 9ddb58a..c12aad0 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementBuilder.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query; import java.io.Serializable; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java index 1048591..ccc8bc8 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CQLStatementTupleMapper.java @@ -1,31 +1,24 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query; -import org.apache.storm.tuple.ITuple; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; - import java.io.Serializable; import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.storm.tuple.ITuple; /** * Default interface to map a {@link org.apache.storm.tuple.ITuple} to a CQL {@link com.datastax.driver.core.Statement}. @@ -52,7 +45,7 @@ public interface CQLStatementTupleMapper extends Serializable { @Override public List<Statement> map(Map<String, Object> conf, Session session, ITuple tuple) { List<Statement> statements = new LinkedList<>(); - for(CQLStatementBuilder b : builders) { + for (CQLStatementBuilder b : builders) { statements.addAll(b.build().map(conf, session, tuple)); } return statements; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java index 06643a5..dd94c28 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/Column.java @@ -1,21 +1,15 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query; import java.io.Serializable; @@ -35,6 +29,13 @@ public class Column<T> implements Serializable { this.val = val; } + public static Object[] getVals(List<Column> columns) { + List<Object> vals = new ArrayList<>(columns.size()); + for (Column c : columns) + vals.add(c.getVal()); + return vals.toArray(); + } + public String getColumnName() { return columnName; } @@ -64,11 +65,4 @@ public class Column<T> implements Serializable { result = 31 * result + (val != null ? val.hashCode() : 0); return result; } - - public static Object[] getVals(List<Column> columns) { - List<Object> vals = new ArrayList<>(columns.size()); - for(Column c : columns) - vals.add(c.getVal()); - return vals.toArray(); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java index 58c7984..bab39eb 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ContextQuery.java @@ -1,27 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.cassandra.query; -import org.apache.storm.tuple.ITuple; +package org.apache.storm.cassandra.query; import java.io.Serializable; import java.util.Map; +import org.apache.storm.tuple.ITuple; /** * This interface may be used to retrieve a cassandra bound query either from storm config http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java index c497f3e..538da8d 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/CqlMapper.java @@ -1,29 +1,22 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ -package org.apache.storm.cassandra.query; -import org.apache.storm.tuple.ITuple; -import org.apache.storm.cassandra.query.selector.FieldSelector; +package org.apache.storm.cassandra.query; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import org.apache.storm.cassandra.query.selector.FieldSelector; +import org.apache.storm.tuple.ITuple; /** * Default interface to defines how a storm tuple maps to a list of columns representing a row in a database. @@ -56,7 +49,7 @@ public interface CqlMapper extends Serializable { @Override public List<Column> map(ITuple tuple) { List<Column> columns = new ArrayList<>(selectors.size()); - for(FieldSelector selector : selectors) + for (FieldSelector selector : selectors) columns.add(selector.select(tuple)); return columns; } @@ -78,7 +71,7 @@ public interface CqlMapper extends Serializable { @Override public List<Column> map(ITuple tuple) { List<Column> columns = new ArrayList<>(tuple.size()); - for(String name : tuple.getFields()) + for (String name : tuple.getFields()) columns.add(new Column(name, tuple.getValueByField(name))); return columns; } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ObjectMapperOperation.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ObjectMapperOperation.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ObjectMapperOperation.java index 29d7ce9..f6660e5 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ObjectMapperOperation.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/ObjectMapperOperation.java @@ -1,20 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra.query; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java index a144830..271ec3d 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/BoundCQLStatementMapperBuilder.java @@ -1,23 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query.builder; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; import org.apache.storm.cassandra.query.CQLStatementBuilder; import org.apache.storm.cassandra.query.ContextQuery; import org.apache.storm.cassandra.query.CqlMapper; @@ -26,13 +23,9 @@ import org.apache.storm.cassandra.query.impl.PreparedStatementBinder; import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator; import org.apache.storm.cassandra.query.selector.FieldSelector; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - -import static org.apache.storm.cassandra.query.ContextQuery.*; +import static org.apache.storm.cassandra.query.ContextQuery.StaticContextQuery; -public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<BoundCQLStatementTupleMapper>, Serializable { +public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<BoundCQLStatementTupleMapper>, Serializable { private final ContextQuery contextQuery; @@ -78,7 +71,7 @@ public class BoundCQLStatementMapperBuilder implements CQLStatementBuilder<Bound return this; } - public final BoundCQLStatementMapperBuilder withRoutingKeys(String...fields) { + public final BoundCQLStatementMapperBuilder withRoutingKeys(String... fields) { this.routingKeys = Arrays.asList(fields); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java index e418468..2044f2d 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/ObjectMapperCqlStatementMapperBuilder.java @@ -1,20 +1,13 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ package org.apache.storm.cassandra.query.builder; http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java index 29ad9c1..0349310 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/builder/SimpleCQLStatementMapperBuilder.java @@ -1,34 +1,27 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query.builder; import com.datastax.driver.core.querybuilder.BuiltStatement; +import java.io.Serializable; +import java.util.Arrays; +import java.util.List; import org.apache.storm.cassandra.query.CQLStatementBuilder; import org.apache.storm.cassandra.query.CqlMapper; import org.apache.storm.cassandra.query.impl.RoutingKeyGenerator; import org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper; import org.apache.storm.cassandra.query.selector.FieldSelector; -import java.io.Serializable; -import java.util.Arrays; -import java.util.List; - /** * Default class to build {@link org.apache.storm.cassandra.query.impl.SimpleCQLStatementMapper} instance. */ @@ -61,7 +54,7 @@ public class SimpleCQLStatementMapperBuilder implements CQLStatementBuilder<Simp */ @Override public SimpleCQLStatementMapper build() { - return new SimpleCQLStatementMapper(queryString, mapper, (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null ); + return new SimpleCQLStatementMapper(queryString, mapper, (routingKeys != null) ? new RoutingKeyGenerator(routingKeys) : null); } /** @@ -74,7 +67,7 @@ public class SimpleCQLStatementMapperBuilder implements CQLStatementBuilder<Simp return this; } - public final SimpleCQLStatementMapperBuilder withRoutingKeys(String...fields) { + public final SimpleCQLStatementMapperBuilder withRoutingKeys(String... fields) { this.routingKeys = Arrays.asList(fields); return this; } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java index 132981f..8d68a1e 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BatchCQLStatementTupleMapper.java @@ -1,33 +1,26 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query.impl; -import org.apache.storm.tuple.ITuple; import com.datastax.driver.core.BatchStatement; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; -import org.apache.storm.cassandra.query.CQLStatementTupleMapper; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.apache.storm.tuple.ITuple; public class BatchCQLStatementTupleMapper implements CQLStatementTupleMapper { @@ -51,8 +44,8 @@ public class BatchCQLStatementTupleMapper implements CQLStatementTupleMapper { @Override public List<Statement> map(Map<String, Object> conf, Session session, ITuple tuple) { final BatchStatement batch = new BatchStatement(this.type); - for(CQLStatementTupleMapper m : mappers) + for (CQLStatementTupleMapper m : mappers) batch.addAll(m.map(conf, session, tuple)); - return Arrays.asList((Statement)batch); + return Arrays.asList((Statement) batch); } } http://git-wip-us.apache.org/repos/asf/storm/blob/1a2d131f/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java ---------------------------------------------------------------------- diff --git a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java index 04933f2..40e2c34 100644 --- a/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java +++ b/external/storm-cassandra/src/main/java/org/apache/storm/cassandra/query/impl/BoundCQLStatementTupleMapper.java @@ -1,50 +1,40 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. */ + package org.apache.storm.cassandra.query.impl; -import org.apache.storm.tuple.ITuple; import com.datastax.driver.core.PreparedStatement; import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.google.common.base.Preconditions; -import org.apache.storm.cassandra.query.CQLStatementTupleMapper; -import org.apache.storm.cassandra.query.Column; -import org.apache.storm.cassandra.query.CqlMapper; -import org.apache.storm.cassandra.query.ContextQuery; - import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.storm.cassandra.query.CQLStatementTupleMapper; +import org.apache.storm.cassandra.query.Column; +import org.apache.storm.cassandra.query.ContextQuery; +import org.apache.storm.cassandra.query.CqlMapper; +import org.apache.storm.tuple.ITuple; public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper { private final ContextQuery contextQuery; private final CqlMapper mapper; - - private Map<String, PreparedStatement> cache = new HashMap<>(); - private final RoutingKeyGenerator rkGenerator; - private final PreparedStatementBinder binder; + private Map<String, PreparedStatement> cache = new HashMap<>(); /** * Creates a new {@link BoundCQLStatementTupleMapper} instance. @@ -55,7 +45,8 @@ public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper { * @param rkGenerator * @param binder */ - public BoundCQLStatementTupleMapper(ContextQuery contextQuery, CqlMapper mapper, RoutingKeyGenerator rkGenerator, PreparedStatementBinder binder) { + public BoundCQLStatementTupleMapper(ContextQuery contextQuery, CqlMapper mapper, RoutingKeyGenerator rkGenerator, + PreparedStatementBinder binder) { Preconditions.checkNotNull(contextQuery, "ContextQuery must not be null"); Preconditions.checkNotNull(mapper, "Mapper should not be null"); this.contextQuery = contextQuery; @@ -74,12 +65,13 @@ public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper { final String query = contextQuery.resolves(config, tuple); PreparedStatement statement = getPreparedStatement(session, query); - if(hasRoutingKeys()) { + if (hasRoutingKeys()) { List<ByteBuffer> keys = rkGenerator.getRoutingKeys(tuple); - if( keys.size() == 1) + if (keys.size() == 1) { statement.setRoutingKey(keys.get(0)); - else + } else { statement.setRoutingKey(keys.toArray(new ByteBuffer[keys.size()])); + } } return Arrays.asList((Statement) this.binder.apply(statement, columns)); @@ -97,7 +89,7 @@ public class BoundCQLStatementTupleMapper implements CQLStatementTupleMapper { */ private PreparedStatement getPreparedStatement(Session session, String query) { PreparedStatement statement = cache.get(query); - if( statement == null) { + if (statement == null) { statement = session.prepare(query); cache.put(query, statement); }
