http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java b/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java deleted file mode 100644 index 432ab2f..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/rpc/ThriftRPCServer.java +++ /dev/null @@ -1,277 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.rpc; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.AbstractExecutionThreadService; -import org.apache.thrift.TProcessor; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.server.TServer; -import org.apache.thrift.server.TThreadedSelectorServerWithFix; -import org.apache.thrift.transport.TFramedTransport; -import org.apache.thrift.transport.TNonblockingServerSocket; -import org.apache.twill.common.Threads; -import org.apache.twill.internal.utils.Networks; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.InvocationHandler; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.lang.reflect.Proxy; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * @param <T> The type of service handler interface. - * @param <I> The type of the thrift service. - */ -public final class ThriftRPCServer<T extends RPCServiceHandler, I> extends AbstractExecutionThreadService { - - private static final Logger LOG = LoggerFactory.getLogger(ThriftRPCServer.class); - - private final String name; - private final int ioThreads; - private final int workerThreads; - private final int maxReadBufferBytes; - private final T serviceHandler; - private final TProcessor processor; - - private InetSocketAddress bindAddress; - private ExecutorService executor; - private TServer server; - - /** - * Creates a {@link Builder} for creating instance of {@link ThriftRPCServer}. - * @param serviceType Class of the thrift service. - * @param <I> Type of the thrift service. - * @return A {@link Builder}. - */ - public static <I> Builder<I> builder(Class<I> serviceType) { - return new Builder<I>(serviceType); - } - - /** - * Builder for creating instance of ThriftRPCServer. By default, the instance created will bind to - * random port and with 2 io threads and worker threads equals to min(2, number of cpu cores - 2). - */ - public static final class Builder<I> { - private final Class<I> serviceType; - private String name; - private InetSocketAddress bindAddress = new InetSocketAddress(0); - private int ioThreads = 2; - private int workerThreads = Runtime.getRuntime().availableProcessors() - 2; - // 16Mb - private int maxReadBufferBytes = 16 * 1024 * 1024; - - private Builder(Class<I> serviceType) { - this.serviceType = serviceType; - this.name = serviceType.getSimpleName(); - } - - public Builder<I> setName(String name) { - this.name = name; - return this; - } - - public Builder<I> setHost(String host) { - this.bindAddress = new InetSocketAddress(host, bindAddress.getPort()); - return this; - } - - public Builder<I> setPort(int port) { - this.bindAddress = new InetSocketAddress(bindAddress.getHostName(), port); - return this; - } - - public Builder<I> setIOThreads(int count) { - this.ioThreads = count; - return this; - } - - public Builder<I> setWorkerThreads(int count) { - this.workerThreads = count; - return this; - } - - public Builder<I> setMaxReadBufferBytes(int maxReadBufferBytes) { - this.maxReadBufferBytes = maxReadBufferBytes; - return this; - } - - public <T extends RPCServiceHandler> ThriftRPCServer<T, I> build(T serviceHandler) { - return new ThriftRPCServer<T, I>(bindAddress, ioThreads, workerThreads, maxReadBufferBytes, - serviceHandler, serviceType, name); - } - } - - /** - * Creates a ThriftRPCServer with the given paramters. - * - * @param bindAddress The socket address for the server to listen on. If {@code null}, it'll be binded to random - * port on localhost. - * @param ioThreads Number of io threads. - * @param workerThreads Number of worker threads. - * @param serviceHandler Handler for handling client requests. - */ - @SuppressWarnings("unchecked") - private ThriftRPCServer(InetSocketAddress bindAddress, int ioThreads, - int workerThreads, int maxReadBufferBytes, - T serviceHandler, Class<I> serviceType, String name) { - Preconditions.checkArgument(ioThreads > 0, "IO threads must be > 0."); - Preconditions.checkArgument(workerThreads > 0, "Worker threads must be > 0."); - - this.bindAddress = bindAddress; - this.ioThreads = ioThreads; - this.workerThreads = workerThreads; - this.maxReadBufferBytes = maxReadBufferBytes; - this.serviceHandler = serviceHandler; - this.name = name; - this.processor = createProcessor((Class<T>) serviceHandler.getClass(), serviceType); - } - - public InetSocketAddress getBindAddress() { - return bindAddress; - } - - @Override - protected void startUp() throws Exception { - // Determines the address and port to listen on - InetSocketAddress listenOn = bindAddress; - if (listenOn == null || listenOn.getPort() <= 0) { - int port = Networks.getRandomPort(); - if (listenOn == null) { - listenOn = new InetSocketAddress("localhost", port); - } else { - listenOn = new InetSocketAddress(listenOn.getAddress(), port); - } - } - bindAddress = listenOn; - - executor = new ThreadPoolExecutor(0, workerThreads, - 60L, TimeUnit.SECONDS, - new SynchronousQueue<Runnable>(), - Threads.createDaemonThreadFactory(String.format("%s-rpc-%%d", name)), - new ThreadPoolExecutor.CallerRunsPolicy()); - serviceHandler.init(); - - TThreadedSelectorServerWithFix.Args args = - new TThreadedSelectorServerWithFix.Args(new TNonblockingServerSocket(listenOn)) - .selectorThreads(ioThreads) - .protocolFactory(new TBinaryProtocol.Factory()) - .transportFactory(new TFramedTransport.Factory()) - .processor(processor) - .executorService(executor); - - // ENG-443 - Set the max read buffer size. This is important as this will - // prevent the server from throwing OOME if telnetd to the port - // it's running on. - args.maxReadBufferBytes = maxReadBufferBytes; - server = new TThreadedSelectorServerWithFix(args); - LOG.info("Starting RPC server for {}", name); - } - - @Override - protected void shutDown() throws Exception { - serviceHandler.destroy(); - executor.shutdownNow(); - LOG.info("RPC server for {} stopped.", name); - } - - @Override - protected void triggerShutdown() { - LOG.info("Request to stop RPC server for {}", name); - server.stop(); - } - - @Override - protected void run() throws Exception { - LOG.info("Running RPC server for {}", name); - server.serve(); - LOG.info("Done running RPC server for {}", name); - } - - @SuppressWarnings("unchecked") - private TProcessor createProcessor(final Class<T> handlerType, Class<I> serviceType) { - // Pick the Iface inner interface and the Processor class - Class<? extends TProcessor> processorType = null; - Class<?> ifaceType = null; - for (Class<?> clz : serviceType.getDeclaredClasses()) { - if (TProcessor.class.isAssignableFrom(clz)) { - processorType = (Class<? extends TProcessor>) clz; - } else if (clz.isInterface() && "Iface".equals(clz.getSimpleName())) { - ifaceType = clz; - } - } - - Preconditions.checkArgument(processorType != null, - "Missing TProcessor, %s is not a valid thrift service.", serviceType.getName()); - Preconditions.checkArgument(ifaceType != null, - "Missing Iface, %s is not a valid thrift service.", serviceType.getName()); - - // If handler already implements the Iface, simply delegate - if (ifaceType.isAssignableFrom(handlerType)) { - return createProxyProcessor(handlerType, processorType, ifaceType); - } - - throw new IllegalArgumentException("Unsupported handler type."); - } - - private TProcessor createProxyProcessor(final Class<T> handlerType, - Class<? extends TProcessor> processorType, Class<?> ifaceType) { - - try { - // Map from Iface method to handlerType method to save reflection lookup - ImmutableMap.Builder<Method, Method> builder = ImmutableMap.builder(); - for (Method method : ifaceType.getMethods()) { - Method handlerMethod = handlerType.getMethod(method.getName(), method.getParameterTypes()); - if (!handlerMethod.isAccessible()) { - handlerMethod.setAccessible(true); - } - builder.put(method, handlerMethod); - } - final Map<Method, Method> methods = builder.build(); - - Object proxy = Proxy.newProxyInstance(ifaceType.getClassLoader(), - new Class[]{ifaceType}, new InvocationHandler() { - @Override - public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { - try { - return methods.get(method).invoke(serviceHandler, args); - } catch (InvocationTargetException e) { - if (e.getCause() != null) { - throw e.getCause(); - } else { - throw e; - } - } - } - }); - - return processorType.getConstructor(ifaceType).newInstance(proxy); - } catch (Exception e) { - throw Throwables.propagate(e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java b/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java deleted file mode 100644 index 2b5bccd..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/rpc/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * This package contains class for writing RPC server and client in simple manner. - */ -package co.cask.tephra.rpc; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java deleted file mode 100644 index 92df8cd..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/ConfigModule.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import com.google.inject.AbstractModule; -import org.apache.hadoop.conf.Configuration; - -/** - * Provides Guice bindings for {@link Configuration}. - */ -public final class ConfigModule extends AbstractModule { - - private final Configuration configuration; - - public ConfigModule(Configuration configuration) { - this.configuration = configuration; - } - - @Override - protected void configure() { - bind(Configuration.class).toInstance(configuration); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java b/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java deleted file mode 100644 index 76cbbdf..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/DiscoveryModules.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import com.google.inject.AbstractModule; -import com.google.inject.Module; -import com.google.inject.PrivateModule; -import com.google.inject.Provides; -import com.google.inject.Singleton; -import org.apache.twill.common.Cancellable; -import org.apache.twill.discovery.Discoverable; -import org.apache.twill.discovery.DiscoveryService; -import org.apache.twill.discovery.DiscoveryServiceClient; -import org.apache.twill.discovery.InMemoryDiscoveryService; -import org.apache.twill.discovery.ServiceDiscovered; -import org.apache.twill.discovery.ZKDiscoveryService; -import org.apache.twill.zookeeper.ZKClientService; - -/** - * Provides access to Google Guice modules for in-memory, single-node, and distributed operation for - * {@link DiscoveryService} and {@link DiscoveryServiceClient}. - */ -public final class DiscoveryModules { - - public Module getInMemoryModules() { - return new InMemoryDiscoveryModule(); - } - - public Module getSingleNodeModules() { - return new InMemoryDiscoveryModule(); - } - - public Module getDistributedModules() { - return new ZKDiscoveryModule(); - } - - private static final class InMemoryDiscoveryModule extends AbstractModule { - - // ensuring to be singleton across JVM - private static final InMemoryDiscoveryService IN_MEMORY_DISCOVERY_SERVICE = new InMemoryDiscoveryService(); - - @Override - protected void configure() { - InMemoryDiscoveryService discovery = IN_MEMORY_DISCOVERY_SERVICE; - bind(DiscoveryService.class).toInstance(discovery); - bind(DiscoveryServiceClient.class).toInstance(discovery); - } - } - - private static final class ZKDiscoveryModule extends PrivateModule { - - @Override - protected void configure() { - expose(DiscoveryService.class); - expose(DiscoveryServiceClient.class); - } - - @Provides - @Singleton - private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) { - return new ZKDiscoveryService(zkClient); - } - - @Provides - @Singleton - private DiscoveryService providesDiscoveryService(final ZKClientService zkClient, - final ZKDiscoveryService delegate) { - return new DiscoveryService() { - @Override - public Cancellable register(Discoverable discoverable) { - if (!zkClient.isRunning()) { - zkClient.startAndWait(); - } - return delegate.register(discoverable); - } - }; - } - - @Provides - @Singleton - private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient, - final ZKDiscoveryService delegate) { - return new DiscoveryServiceClient() { - @Override - public ServiceDiscovered discover(String s) { - if (!zkClient.isRunning()) { - zkClient.startAndWait(); - } - return delegate.discover(s); - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java deleted file mode 100644 index c6df2b3..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionClientModule.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.distributed.PooledClientProvider; -import co.cask.tephra.distributed.ThreadLocalClientProvider; -import co.cask.tephra.distributed.ThriftClientProvider; -import com.google.inject.AbstractModule; -import com.google.inject.Inject; -import com.google.inject.Provider; -import com.google.inject.Singleton; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.discovery.DiscoveryServiceClient; - -/** - * Provides Guice binding for {@link co.cask.tephra.distributed.ThriftClientProvider}. - */ -public class TransactionClientModule extends AbstractModule { - - @Override - protected void configure() { - bind(ThriftClientProvider.class).toProvider(ThriftClientProviderSupplier.class); - } - - /** - * Provides implementation of {@link co.cask.tephra.distributed.ThriftClientProvider} - * based on configuration. - */ - @Singleton - private static final class ThriftClientProviderSupplier implements Provider<ThriftClientProvider> { - - private final Configuration cConf; - private DiscoveryServiceClient discoveryServiceClient; - - @Inject - ThriftClientProviderSupplier(Configuration cConf) { - this.cConf = cConf; - } - - @Inject(optional = true) - void setDiscoveryServiceClient(DiscoveryServiceClient discoveryServiceClient) { - this.discoveryServiceClient = discoveryServiceClient; - } - - @Override - public ThriftClientProvider get() { - // configure the client provider - String provider = cConf.get(TxConstants.Service.CFG_DATA_TX_CLIENT_PROVIDER, - TxConstants.Service.DEFAULT_DATA_TX_CLIENT_PROVIDER); - ThriftClientProvider clientProvider; - if ("pool".equals(provider)) { - clientProvider = new PooledClientProvider(cConf, discoveryServiceClient); - } else if ("thread-local".equals(provider)) { - clientProvider = new ThreadLocalClientProvider(cConf, discoveryServiceClient); - } else { - String message = "Unknown Transaction Service Client Provider '" + provider + "'."; - throw new IllegalArgumentException(message); - } - return clientProvider; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java deleted file mode 100644 index c88f742..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionDistributedModule.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import co.cask.tephra.DefaultTransactionExecutor; -import co.cask.tephra.TransactionExecutor; -import co.cask.tephra.TransactionExecutorFactory; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.distributed.TransactionServiceClient; -import co.cask.tephra.metrics.DefaultMetricsCollector; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.persist.HDFSTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import com.google.inject.AbstractModule; -import com.google.inject.Singleton; -import com.google.inject.assistedinject.FactoryModuleBuilder; -import com.google.inject.name.Names; - -/** - * Guice bindings for running in distributed mode on a cluster. - */ -final class TransactionDistributedModule extends AbstractModule { - - @Override - protected void configure() { - bind(SnapshotCodecProvider.class).in(Singleton.class); - bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) - .to(HDFSTransactionStateStorage.class).in(Singleton.class); - bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class); - - bind(TransactionManager.class).in(Singleton.class); - bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class); - bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class); - - install(new FactoryModuleBuilder() - .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) - .build(TransactionExecutorFactory.class)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java deleted file mode 100644 index d9780e1..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionInMemoryModule.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import co.cask.tephra.DefaultTransactionExecutor; -import co.cask.tephra.TransactionExecutor; -import co.cask.tephra.TransactionExecutorFactory; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.metrics.TxMetricsCollector; -import co.cask.tephra.persist.NoOpTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.snapshot.SnapshotCodecProvider; - -import com.google.inject.AbstractModule; -import com.google.inject.Singleton; -import com.google.inject.assistedinject.FactoryModuleBuilder; - -/** - * Guice bindings for running completely in-memory (no persistence). This should only be used for - * test classes, as the transaction state cannot be recovered in the case of a failure. - */ -public class TransactionInMemoryModule extends AbstractModule { - public TransactionInMemoryModule() { - } - - @Override - protected void configure() { - bind(SnapshotCodecProvider.class).in(Singleton.class); - bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class); - bind(TransactionManager.class).in(Singleton.class); - bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); - // no metrics output for in-memory - bind(MetricsCollector.class).to(TxMetricsCollector.class); - - install(new FactoryModuleBuilder() - .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) - .build(TransactionExecutorFactory.class)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java deleted file mode 100644 index 720c84c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionLocalModule.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import co.cask.tephra.DefaultTransactionExecutor; -import co.cask.tephra.TransactionExecutor; -import co.cask.tephra.TransactionExecutorFactory; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionSystemClient; -import co.cask.tephra.inmemory.InMemoryTxSystemClient; -import co.cask.tephra.metrics.DefaultMetricsCollector; -import co.cask.tephra.metrics.MetricsCollector; -import co.cask.tephra.persist.LocalFileTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import co.cask.tephra.snapshot.SnapshotCodecProvider; -import com.google.inject.AbstractModule; -import com.google.inject.Singleton; -import com.google.inject.assistedinject.FactoryModuleBuilder; -import com.google.inject.name.Names; - -/** - * Guice bindings for running in single-node mode (persistence to local disk and in-memory client). - */ -final class TransactionLocalModule extends AbstractModule { - - @Override - protected void configure() { - bind(SnapshotCodecProvider.class).in(Singleton.class); - bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) - .to(LocalFileTransactionStateStorage.class).in(Singleton.class); - bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class); - - bind(TransactionManager.class).in(Singleton.class); - bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); - bind(MetricsCollector.class).to(DefaultMetricsCollector.class); - - install(new FactoryModuleBuilder() - .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) - .build(TransactionExecutorFactory.class)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java deleted file mode 100644 index 551fb3a..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionModules.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import com.google.inject.Module; - -/** - * Provides access to Google Guice modules for in-memory, single-node, and distributed operation. - */ -public class TransactionModules { - public TransactionModules() { - } - - public Module getInMemoryModules() { - return new TransactionInMemoryModule(); - } - - public Module getSingleNodeModules() { - return new TransactionLocalModule(); - } - - public Module getDistributedModules() { - return new TransactionDistributedModule(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java deleted file mode 100644 index 777354c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/TransactionStateStorageProvider.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package co.cask.tephra.runtime; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.persist.NoOpTransactionStateStorage; -import co.cask.tephra.persist.TransactionStateStorage; -import com.google.inject.Inject; -import com.google.inject.Injector; -import com.google.inject.Key; -import com.google.inject.Provider; -import com.google.inject.Singleton; -import com.google.inject.name.Names; -import org.apache.hadoop.conf.Configuration; - -/** - * A provider for {@link TransactionStateStorage} that provides different - * {@link TransactionStateStorage} implementation based on configuration. - */ -@Singleton -public final class TransactionStateStorageProvider implements Provider<TransactionStateStorage> { - - private final Configuration cConf; - private final Injector injector; - - @Inject - TransactionStateStorageProvider(Configuration cConf, Injector injector) { - this.cConf = cConf; - this.injector = injector; - } - - @Override - public TransactionStateStorage get() { - if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) { - return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist"))); - } else { - return injector.getInstance(NoOpTransactionStateStorage.class); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java deleted file mode 100644 index ebc1df7..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/runtime/ZKModule.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.runtime; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.zookeeper.TephraZKClientService; -import com.google.common.collect.ArrayListMultimap; -import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.Singleton; -import org.apache.hadoop.conf.Configuration; -import org.apache.twill.zookeeper.RetryStrategies; -import org.apache.twill.zookeeper.ZKClient; -import org.apache.twill.zookeeper.ZKClientService; -import org.apache.twill.zookeeper.ZKClientServices; -import org.apache.twill.zookeeper.ZKClients; - -import java.util.concurrent.TimeUnit; - -/** - * Provides Guice binding to {@link ZKClient} and {@link ZKClientService}. - */ -public class ZKModule extends AbstractModule { - - @Override - protected void configure() { - /** - * ZKClientService is provided by the provider method - * {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}. - */ - bind(ZKClient.class).to(ZKClientService.class); - } - - @Provides - @Singleton - private ZKClientService provideZKClientService(Configuration conf) { - String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); - if (zkStr == null) { - // Default to HBase one. - zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM); - } - - int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT); - ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null, - ArrayListMultimap.<String, byte[]>create()); - return ZKClientServices.delegate( - ZKClients.reWatchOnExpire( - ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) - ) - ) - ); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java deleted file mode 100644 index 8203494..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryDecoder.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -/** - * An decoder to help read snapshots in binary format. - */ -public final class BinaryDecoder { - - private final InputStream input; - - /** - * @param input Stream to read from. - */ - public BinaryDecoder(InputStream input) { - this.input = input; - } - - /** - * Read one int from the input. - * @return the read number - * @throws java.io.IOException If there is IO error. - * @throws java.io.EOFException If end of file reached. - */ - public int readInt() throws IOException { - int val = 0; - int shift = 0; - int b = readByte(); - while (b > 0x7f) { - val ^= (b & 0x7f) << shift; - shift += 7; - b = readByte(); - } - val ^= b << shift; - return (val >>> 1) ^ -(val & 1); - } - - /** - * Read one long int from the input. - * @return the read number - * @throws java.io.IOException If there is IO error. - * @throws java.io.EOFException If end of file reached. - */ - public long readLong() throws IOException { - long val = 0; - int shift = 0; - int b = readByte(); - while (b > 0x7f) { - val ^= (long) (b & 0x7f) << shift; - shift += 7; - b = readByte(); - } - val ^= (long) b << shift; - return (val >>> 1) ^ -(val & 1); - } - - /** - * Read a byte sequence. First read an int to indicate how many bytes to read, then that many bytes. - * @return the read bytes as a byte array - * @throws java.io.IOException If there is IO error. - * @throws java.io.EOFException If end of file reached. - */ - public byte[] readBytes() throws IOException { - int toRead = readInt(); - byte[] bytes = new byte[toRead]; - while (toRead > 0) { - int byteRead = input.read(bytes, bytes.length - toRead, toRead); - if (byteRead == -1) { - throw new EOFException(); - } - toRead -= byteRead; - } - return bytes; - } - - /** - * Reads a single byte value. - * - * @return The byte value read. - * @throws java.io.IOException If there is IO error. - * @throws java.io.EOFException If end of file reached. - */ - private int readByte() throws IOException { - int b = input.read(); - if (b == -1) { - throw new EOFException(); - } - return b; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java deleted file mode 100644 index bc1bce0..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/BinaryEncoder.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * An encoder to help encode snapshots in binary format. - */ -public final class BinaryEncoder { - - private final OutputStream output; - - /** - * @param output stream to write to - */ - public BinaryEncoder(OutputStream output) { - this.output = output; - } - - /** - * write a single int value. - * @throws java.io.IOException If there is IO error. - */ - public BinaryEncoder writeInt(int i) throws IOException { - // Compute the zig-zag value. First double the value and flip the bit if the input is negative. - int val = (i << 1) ^ (i >> 31); - - if ((val & ~0x7f) != 0) { - output.write(0x80 | val & 0x7f); - val >>>= 7; - while (val > 0x7f) { - output.write(0x80 | val & 0x7f); - val >>>= 7; - } - } - output.write(val); - - return this; - } - - /** - * write a single long int value. - * @throws java.io.IOException If there is IO error. - */ - public BinaryEncoder writeLong(long l) throws IOException { - // Compute the zig-zag value. First double the value and flip the bit if the input is negative. - long val = (l << 1) ^ (l >> 63); - - if ((val & ~0x7f) != 0) { - output.write((int) (0x80 | val & 0x7f)); - val >>>= 7; - while (val > 0x7f) { - output.write((int) (0x80 | val & 0x7f)); - val >>>= 7; - } - } - output.write((int) val); - - return this; - } - - /** - * write a sequence of bytes. First writes the number of bytes as an int, then the bytes themselves. - * @throws java.io.IOException If there is IO error. - */ - public BinaryEncoder writeBytes(byte[] bytes) throws IOException { - writeLong(bytes.length); - output.write(bytes, 0, bytes.length); - return this; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java deleted file mode 100644 index d09d79d..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/DefaultSnapshotCodec.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import co.cask.tephra.ChangeId; -import co.cask.tephra.TransactionManager; -import co.cask.tephra.persist.TransactionSnapshot; -import co.cask.tephra.persist.TransactionVisibilityState; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Set; -import java.util.TreeMap; - -/** - * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} - * and its elements to {@code byte[]}. - * @deprecated This codec is now deprecated and is replaced by {@link SnapshotCodecV2}. - */ -@Deprecated -public class DefaultSnapshotCodec implements SnapshotCodec { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultSnapshotCodec.class); - - @Override - public int getVersion() { - return 1; - } - - @Override - public void encode(OutputStream out, TransactionSnapshot snapshot) { - try { - BinaryEncoder encoder = new BinaryEncoder(out); - - encoder.writeLong(snapshot.getTimestamp()); - encoder.writeLong(snapshot.getReadPointer()); - encoder.writeLong(snapshot.getWritePointer()); - encodeInvalid(encoder, snapshot.getInvalid()); - encodeInProgress(encoder, snapshot.getInProgress()); - encodeChangeSets(encoder, snapshot.getCommittingChangeSets()); - encodeChangeSets(encoder, snapshot.getCommittedChangeSets()); - - } catch (IOException e) { - LOG.error("Unable to serialize transaction state: ", e); - throw Throwables.propagate(e); - } - } - - @Override - public TransactionSnapshot decode(InputStream in) { - BinaryDecoder decoder = new BinaryDecoder(in); - - try { - TransactionVisibilityState minTxSnapshot = decodeTransactionVisibilityState(in); - NavigableMap<Long, Set<ChangeId>> committing = decodeChangeSets(decoder); - NavigableMap<Long, Set<ChangeId>> committed = decodeChangeSets(decoder); - return new TransactionSnapshot(minTxSnapshot.getTimestamp(), minTxSnapshot.getReadPointer(), - minTxSnapshot.getWritePointer(), minTxSnapshot.getInvalid(), - minTxSnapshot.getInProgress(), committing, committed); - } catch (IOException e) { - LOG.error("Unable to deserialize transaction state: ", e); - throw Throwables.propagate(e); - } - } - - @Override - public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) { - BinaryDecoder decoder = new BinaryDecoder(in); - try { - long timestamp = decoder.readLong(); - long readPointer = decoder.readLong(); - long writePointer = decoder.readLong(); - Collection<Long> invalid = decodeInvalid(decoder); - NavigableMap<Long, TransactionManager.InProgressTx> inProgress = decodeInProgress(decoder); - return new TransactionSnapshot(timestamp, readPointer, writePointer, invalid, inProgress); - } catch (IOException e) { - LOG.error("Unable to deserialize transaction state: ", e); - throw Throwables.propagate(e); - } - } - - private void encodeInvalid(BinaryEncoder encoder, Collection<Long> invalid) throws IOException { - if (!invalid.isEmpty()) { - encoder.writeInt(invalid.size()); - for (long invalidTx : invalid) { - encoder.writeLong(invalidTx); - } - } - encoder.writeInt(0); // zero denotes end of list as per AVRO spec - } - - private Collection<Long> decodeInvalid(BinaryDecoder decoder) throws IOException { - int size = decoder.readInt(); - Collection<Long> invalid = Lists.newArrayListWithCapacity(size); - while (size != 0) { // zero denotes end of list as per AVRO spec - for (int remaining = size; remaining > 0; --remaining) { - invalid.add(decoder.readLong()); - } - size = decoder.readInt(); - } - return invalid; - } - - protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress) - throws IOException { - - if (!inProgress.isEmpty()) { - encoder.writeInt(inProgress.size()); - for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { - encoder.writeLong(entry.getKey()); // tx id - encoder.writeLong(entry.getValue().getExpiration()); - encoder.writeLong(entry.getValue().getVisibilityUpperBound()); - } - } - encoder.writeInt(0); // zero denotes end of list as per AVRO spec - } - - protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder) - throws IOException { - - int size = decoder.readInt(); - NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); - while (size != 0) { // zero denotes end of list as per AVRO spec - for (int remaining = size; remaining > 0; --remaining) { - long txId = decoder.readLong(); - long expiration = decoder.readLong(); - long visibilityUpperBound = decoder.readLong(); - inProgress.put(txId, - new TransactionManager.InProgressTx(visibilityUpperBound, expiration)); - } - size = decoder.readInt(); - } - return inProgress; - } - - private void encodeChangeSets(BinaryEncoder encoder, Map<Long, Set<ChangeId>> changes) throws IOException { - if (!changes.isEmpty()) { - encoder.writeInt(changes.size()); - for (Map.Entry<Long, Set<ChangeId>> entry : changes.entrySet()) { - encoder.writeLong(entry.getKey()); - encodeChanges(encoder, entry.getValue()); - } - } - encoder.writeInt(0); // zero denotes end of list as per AVRO spec - } - - private NavigableMap<Long, Set<ChangeId>> decodeChangeSets(BinaryDecoder decoder) throws IOException { - int size = decoder.readInt(); - NavigableMap<Long, Set<ChangeId>> changeSets = new TreeMap<Long, Set<ChangeId>>(); - while (size != 0) { // zero denotes end of list as per AVRO spec - for (int remaining = size; remaining > 0; --remaining) { - changeSets.put(decoder.readLong(), decodeChanges(decoder)); - } - size = decoder.readInt(); - } - return changeSets; - } - - private void encodeChanges(BinaryEncoder encoder, Set<ChangeId> changes) throws IOException { - if (!changes.isEmpty()) { - encoder.writeInt(changes.size()); - for (ChangeId change : changes) { - encoder.writeBytes(change.getKey()); - } - } - encoder.writeInt(0); // zero denotes end of list as per AVRO spec - } - - private Set<ChangeId> decodeChanges(BinaryDecoder decoder) throws IOException { - int size = decoder.readInt(); - HashSet<ChangeId> changes = Sets.newHashSetWithExpectedSize(size); - while (size != 0) { // zero denotes end of list as per AVRO spec - for (int remaining = size; remaining > 0; --remaining) { - changes.add(new ChangeId(decoder.readBytes())); - } - size = decoder.readInt(); - } - // todo is there an immutable hash set? - return changes; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java deleted file mode 100644 index 8e9624e..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodec.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import co.cask.tephra.persist.TransactionSnapshot; -import co.cask.tephra.persist.TransactionVisibilityState; - -import java.io.InputStream; -import java.io.OutputStream; - -/** - * Interface to decode and encode a transaction snapshot. Each codec implements one version of the encoding. - * It need not include the version when encoding the snapshot. - */ -public interface SnapshotCodec { - - /** - * @return the version of the encoding implemented by the codec. - */ - int getVersion(); - - /** - * Encode a transaction snapshot into an output stream. - * @param out the output stream to write to - * @param snapshot the snapshot to encode - */ - void encode(OutputStream out, TransactionSnapshot snapshot); - - /** - * Decode a transaction snapshot from an input stream. - * @param in the input stream to read from - * @return the decoded snapshot - */ - TransactionSnapshot decode(InputStream in); - - /** - * Decode transaction visibility state from an input stream. - * @param in the input stream to read from - * @return {@link TransactionVisibilityState} - */ - TransactionVisibilityState decodeTransactionVisibilityState(InputStream in); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java deleted file mode 100644 index 28d829c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecProvider.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import co.cask.tephra.TxConstants; -import co.cask.tephra.persist.TransactionSnapshot; -import co.cask.tephra.persist.TransactionVisibilityState; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.inject.Inject; -import org.apache.hadoop.conf.Configuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Arrays; -import java.util.List; -import java.util.SortedMap; -import javax.annotation.Nonnull; - -/** - * Maintains the codecs for all known versions of the transaction snapshot encoding. - */ -public class SnapshotCodecProvider implements SnapshotCodec { - - private static final Logger LOG = LoggerFactory.getLogger(SnapshotCodecProvider.class); - - private final SortedMap<Integer, SnapshotCodec> codecs = Maps.newTreeMap(); - - @Inject - public SnapshotCodecProvider(Configuration configuration) { - initialize(configuration); - } - - /** - * Register all codec specified in the configuration with this provider. - * There can only be one codec for a given version. - */ - private void initialize(Configuration configuration) { - String[] codecClassNames = configuration.getTrimmedStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); - List<Class> codecClasses = Lists.newArrayList(); - if (codecClassNames != null) { - for (String clsName : codecClassNames) { - try { - codecClasses.add(Class.forName(clsName)); - } catch (ClassNotFoundException cnfe) { - LOG.warn("Unable to load class configured for " + TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES - + ": " + clsName, cnfe); - } - } - } - - if (codecClasses.size() == 0) { - codecClasses.addAll(Arrays.asList(TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES)); - } - for (Class<?> codecClass : codecClasses) { - try { - SnapshotCodec codec = (SnapshotCodec) (codecClass.newInstance()); - codecs.put(codec.getVersion(), codec); - LOG.debug("Using snapshot codec {} for snapshots of version {}", codecClass.getName(), codec.getVersion()); - } catch (Exception e) { - LOG.warn("Error instantiating snapshot codec {}. Skipping.", codecClass.getName(), e); - } - } - } - - /** - * Retrieve the codec for a particular version of the encoding. - * @param version the version of interest - * @return the corresponding codec - * @throws java.lang.IllegalArgumentException if the version is not known - */ - @Nonnull - @VisibleForTesting - SnapshotCodec getCodecForVersion(int version) { - SnapshotCodec codec = codecs.get(version); - if (codec == null) { - throw new IllegalArgumentException(String.format("Version %d of snapshot encoding is not supported", version)); - } - return codec; - } - - /** - * Retrieve the current snapshot codec, that is, the codec with the highest known version. - * @return the current codec - * @throws java.lang.IllegalStateException if no codecs are registered - */ - private SnapshotCodec getCurrentCodec() { - if (codecs.isEmpty()) { - throw new IllegalStateException(String.format("No codecs are registered.")); - } - return codecs.get(codecs.lastKey()); - } - - // Return the appropriate codec for the version in InputStream - private SnapshotCodec getCodec(InputStream in) { - BinaryDecoder decoder = new BinaryDecoder(in); - int persistedVersion; - try { - persistedVersion = decoder.readInt(); - } catch (IOException e) { - LOG.error("Unable to read transaction state version: ", e); - throw Throwables.propagate(e); - } - return getCodecForVersion(persistedVersion); - } - - @Override - public int getVersion() { - return getCurrentCodec().getVersion(); - } - - @Override - public TransactionSnapshot decode(InputStream in) { - return getCodec(in).decode(in); - } - - @Override - public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) { - return getCodec(in).decodeTransactionVisibilityState(in); - } - - @Override - public void encode(OutputStream out, TransactionSnapshot snapshot) { - SnapshotCodec codec = getCurrentCodec(); - try { - new BinaryEncoder(out).writeInt(codec.getVersion()); - } catch (IOException e) { - LOG.error("Unable to write transaction state version: ", e); - throw Throwables.propagate(e); - } - codec.encode(out, snapshot); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java deleted file mode 100644 index 5d07ba5..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV2.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionType; -import com.google.common.collect.Maps; -import it.unimi.dsi.fastutil.longs.LongArrayList; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -/** - * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} - * and its elements to {@code byte[]}. - */ -public class SnapshotCodecV2 extends DefaultSnapshotCodec { - @Override - public int getVersion() { - return 2; - } - - @Override - protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress) - throws IOException { - - if (!inProgress.isEmpty()) { - encoder.writeInt(inProgress.size()); - for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { - encoder.writeLong(entry.getKey()); // tx id - encoder.writeLong(entry.getValue().getExpiration()); - encoder.writeLong(entry.getValue().getVisibilityUpperBound()); - encoder.writeInt(entry.getValue().getType().ordinal()); - } - } - encoder.writeInt(0); // zero denotes end of list as per AVRO spec - } - - @Override - protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder) - throws IOException { - - int size = decoder.readInt(); - NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); - while (size != 0) { // zero denotes end of list as per AVRO spec - for (int remaining = size; remaining > 0; --remaining) { - long txId = decoder.readLong(); - long expiration = decoder.readLong(); - long visibilityUpperBound = decoder.readLong(); - int txTypeIdx = decoder.readInt(); - TransactionType txType; - try { - txType = TransactionType.values()[txTypeIdx]; - } catch (ArrayIndexOutOfBoundsException e) { - throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx); - } - inProgress.put(txId, - new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, - new LongArrayList())); - } - size = decoder.readInt(); - } - return inProgress; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java deleted file mode 100644 index b11821a..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV3.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -/** - * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} - * and its elements to {@code byte[]}. - * - * <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2}, - * but a new version number is used to allow easy migration from projects using deprecated codecs with - * conflicting version numbers.</p> - */ -public class SnapshotCodecV3 extends SnapshotCodecV2 { - @Override - public int getVersion() { - return 3; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java deleted file mode 100644 index 41d30f2..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/SnapshotCodecV4.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.snapshot; - -import co.cask.tephra.TransactionManager; -import co.cask.tephra.TransactionType; -import com.google.common.collect.Maps; -import it.unimi.dsi.fastutil.longs.LongArrayList; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -/** - * Handles serialization/deserialization of a {@link co.cask.tephra.persist.TransactionSnapshot} - * and its elements to {@code byte[]}. - * - */ -public class SnapshotCodecV4 extends SnapshotCodecV2 { - @Override - public int getVersion() { - return 4; - } - - @Override - protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress) - throws IOException { - - if (!inProgress.isEmpty()) { - encoder.writeInt(inProgress.size()); - for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { - encoder.writeLong(entry.getKey()); // tx id - encoder.writeLong(entry.getValue().getExpiration()); - encoder.writeLong(entry.getValue().getVisibilityUpperBound()); - encoder.writeInt(entry.getValue().getType().ordinal()); - // write checkpoint tx IDs - LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers(); - if (checkpointPointers != null && !checkpointPointers.isEmpty()) { - encoder.writeInt(checkpointPointers.size()); - for (int i = 0; i < checkpointPointers.size(); i++) { - encoder.writeLong(checkpointPointers.getLong(i)); - } - } - encoder.writeInt(0); - } - } - encoder.writeInt(0); // zero denotes end of list as per AVRO spec - } - - @Override - protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder) - throws IOException { - - int size = decoder.readInt(); - NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); - while (size != 0) { // zero denotes end of list as per AVRO spec - for (int remaining = size; remaining > 0; --remaining) { - long txId = decoder.readLong(); - long expiration = decoder.readLong(); - long visibilityUpperBound = decoder.readLong(); - int txTypeIdx = decoder.readInt(); - TransactionType txType; - try { - txType = TransactionType.values()[txTypeIdx]; - } catch (ArrayIndexOutOfBoundsException e) { - throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx); - } - // read checkpoint tx IDs - int checkpointPointerSize = decoder.readInt(); - LongArrayList checkpointPointers = new LongArrayList(checkpointPointerSize); - while (checkpointPointerSize != 0) { - for (int checkpointRemaining = checkpointPointerSize; checkpointRemaining > 0; --checkpointRemaining) { - checkpointPointers.add(decoder.readLong()); - } - checkpointPointerSize = decoder.readInt(); - } - inProgress.put(txId, - new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, checkpointPointers)); - } - size = decoder.readInt(); - } - return inProgress; - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java b/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java deleted file mode 100644 index e2d044c..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/snapshot/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package contains interfaces and implementations for encoding and decoding transaction snapshots. - */ -package co.cask.tephra.snapshot; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java deleted file mode 100644 index e830524..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationFactory.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import com.google.inject.Provider; -import org.apache.hadoop.conf.Configuration; - -/** - * Provides {@code org.apache.hadoop.conf.Configuration} instances, constructed by the correct version used - * for the runtime. - */ -public class ConfigurationFactory implements Provider<Configuration> { - private static class ConfigurationProviderFactory extends HBaseVersionSpecificFactory<ConfigurationProvider> { - @Override - protected String getHBase96Classname() { - return "co.cask.tephra.hbase96.HBase96ConfigurationProvider"; - } - - @Override - protected String getHBase98Classname() { - return "co.cask.tephra.hbase98.HBase98ConfigurationProvider"; - } - - @Override - protected String getHBase10Classname() { - return "co.cask.tephra.hbase10.HBase10ConfigurationProvider"; - } - - @Override - protected String getHBase11Classname() { - return "co.cask.tephra.hbase11.HBase11ConfigurationProvider"; - } - - @Override - protected String getHBase10CDHClassname() { - return "co.cask.tephra.hbase10cdh.HBase10ConfigurationProvider"; - } - } - - private final ConfigurationProvider provider = new ConfigurationProviderFactory().get(); - - /** - * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory. - */ - @Override - public Configuration get() { - return provider.get(); - } - - /** - * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory. - * - * @param baseConf additional configuration properties to merge on to the classpath configuration - * @return the merged configuration - */ - public Configuration get(Configuration baseConf) { - return provider.get(baseConf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java b/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java deleted file mode 100644 index e88acf8..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/util/ConfigurationProvider.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import com.google.inject.Provider; -import org.apache.hadoop.conf.Configuration; - -/** - * Provides {@code Configuration} instances, constructed by the HBase version on which we are running. - */ -public abstract class ConfigurationProvider implements Provider<Configuration> { - @Override - public abstract Configuration get(); - - public abstract Configuration get(Configuration baseConf); -} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java deleted file mode 100644 index 84311c9..0000000 --- a/tephra-core/src/main/java/co/cask/tephra/util/HBaseVersion.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package co.cask.tephra.util; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.lang.reflect.Method; -import java.text.ParseException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Detects the currently loaded HBase version. It is assumed that only one HBase version is loaded at a time, - * since using more than one HBase version within the same process will require classloader isolation anyway. - */ -public class HBaseVersion { - private static final String HBASE_94_VERSION = "0.94"; - private static final String HBASE_96_VERSION = "0.96"; - private static final String HBASE_98_VERSION = "0.98"; - private static final String HBASE_10_VERSION = "1.0"; - private static final String HBASE_11_VERSION = "1.1"; - private static final String HBASE_12_VERSION = "1.2"; - private static final String CDH_CLASSIFIER = "cdh"; - - private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class); - - /** - * Represents the major version of the HBase library that is currently loaded. - */ - public enum Version { - HBASE_94("0.94"), - HBASE_96("0.96"), - HBASE_98("0.98"), - HBASE_10("1.0"), - HBASE_10_CDH("1.0-cdh"), - HBASE_11("1.1"), - HBASE_12_CDH("1.2-cdh"), - UNKNOWN("unknown"); - - final String majorVersion; - - Version(String majorVersion) { - this.majorVersion = majorVersion; - } - - public String getMajorVersion() { - return majorVersion; - } - } - - private static Version currentVersion; - private static String versionString; - static { - try { - Class versionInfoClass = Class.forName("org.apache.hadoop.hbase.util.VersionInfo"); - Method versionMethod = versionInfoClass.getMethod("getVersion"); - versionString = (String) versionMethod.invoke(null); - if (versionString.startsWith(HBASE_94_VERSION)) { - currentVersion = Version.HBASE_94; - } else if (versionString.startsWith(HBASE_96_VERSION)) { - currentVersion = Version.HBASE_96; - } else if (versionString.startsWith(HBASE_98_VERSION)) { - currentVersion = Version.HBASE_98; - } else if (versionString.startsWith(HBASE_10_VERSION)) { - VersionNumber ver = VersionNumber.create(versionString); - if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) { - currentVersion = Version.HBASE_10_CDH; - } else { - currentVersion = Version.HBASE_10; - } - } else if (versionString.startsWith(HBASE_11_VERSION)) { - currentVersion = Version.HBASE_11; - } else if (versionString.startsWith(HBASE_12_VERSION)) { - VersionNumber ver = VersionNumber.create(versionString); - if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) { - currentVersion = Version.HBASE_12_CDH; - } else { - // CDH 5.7 comes with HBase version 1.2.0-CDH5.7.0. However currently there is no - // other hadoop distribution that uses HBase 1.2, so the version is set here to UNKNOWN. - currentVersion = Version.UNKNOWN; - } - } else { - currentVersion = Version.UNKNOWN; - } - } catch (Throwable e) { - // must be a class loading exception, HBase is not there - LOG.error("Unable to determine HBase version from string '{}', are HBase classes available?", versionString); - LOG.error("Exception was: ", e); - currentVersion = Version.UNKNOWN; - } - } - - /** - * Returns the major version of the currently loaded HBase library. - */ - public static Version get() { - return currentVersion; - } - - /** - * Returns the full version string for the currently loaded HBase library. - */ - public static String getVersionString() { - return versionString; - } - - /** - * Prints out the HBase {@link Version} enum value for the current version of HBase on the classpath. - */ - public static void main(String[] args) { - boolean verbose = args.length == 1 && "-v".equals(args[0]); - Version version = HBaseVersion.get(); - System.out.println(version.getMajorVersion()); - if (verbose) { - System.out.println("versionString=" + getVersionString()); - } - } - - /** - * Utility class to parse apart version number components. The version string provided is expected to be in - * the format: major[.minor[.patch[.last]][-classifier][-SNAPSHOT] - * - * <p>Only the major version number is actually required.</p> - */ - public static class VersionNumber { - private static final Pattern PATTERN = - Pattern.compile("(\\d+)(\\.(\\d+))?(\\.(\\d+))?(\\.(\\d+))?(\\-(?!SNAPSHOT)([^\\-]+))?(\\-SNAPSHOT)?"); - - private Integer major; - private Integer minor; - private Integer patch; - private Integer last; - private String classifier; - private boolean snapshot; - - private VersionNumber(Integer major, Integer minor, Integer patch, Integer last, - String classifier, boolean snapshot) { - this.major = major; - this.minor = minor; - this.patch = patch; - this.last = last; - this.classifier = classifier; - this.snapshot = snapshot; - } - - public Integer getMajor() { - return major; - } - - public Integer getMinor() { - return minor; - } - - public Integer getPatch() { - return patch; - } - - public Integer getLast() { - return last; - } - - public String getClassifier() { - return classifier; - } - - public boolean isSnapshot() { - return snapshot; - } - - public static VersionNumber create(String versionString) throws ParseException { - Matcher matcher = PATTERN.matcher(versionString); - if (matcher.matches()) { - String majorString = matcher.group(1); - String minorString = matcher.group(3); - String patchString = matcher.group(5); - String last = matcher.group(7); - String classifier = matcher.group(9); - String snapshotString = matcher.group(10); - return new VersionNumber(new Integer(majorString), - minorString != null ? new Integer(minorString) : null, - patchString != null ? new Integer(patchString) : null, - last != null ? new Integer(last) : null, - classifier, - "-SNAPSHOT".equals(snapshotString)); - } - throw new ParseException( - "Input string did not match expected pattern: major[.minor[.patch]][-classifier][-SNAPSHOT]", 0); - } - } -}
