http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java b/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java new file mode 100644 index 0000000..738d6c8 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/rpc/ThriftRPCServer.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.rpc; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.AbstractExecutionThreadService; +import org.apache.thrift.TProcessor; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadedSelectorServerWithFix; +import org.apache.thrift.transport.TFramedTransport; +import org.apache.thrift.transport.TNonblockingServerSocket; +import org.apache.twill.common.Threads; +import org.apache.twill.internal.utils.Networks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * @param <T> The type of service handler interface. + * @param <I> The type of the thrift service. + */ +public final class ThriftRPCServer<T extends RPCServiceHandler, I> extends AbstractExecutionThreadService { + + private static final Logger LOG = LoggerFactory.getLogger(ThriftRPCServer.class); + + private final String name; + private final int ioThreads; + private final int workerThreads; + private final int maxReadBufferBytes; + private final T serviceHandler; + private final TProcessor processor; + + private InetSocketAddress bindAddress; + private ExecutorService executor; + private TServer server; + + /** + * Creates a {@link Builder} for creating instance of {@link ThriftRPCServer}. + * @param serviceType Class of the thrift service. + * @param <I> Type of the thrift service. + * @return A {@link Builder}. + */ + public static <I> Builder<I> builder(Class<I> serviceType) { + return new Builder<I>(serviceType); + } + + /** + * Builder for creating instance of ThriftRPCServer. By default, the instance created will bind to + * random port and with 2 io threads and worker threads equals to min(2, number of cpu cores - 2). + */ + public static final class Builder<I> { + private final Class<I> serviceType; + private String name; + private InetSocketAddress bindAddress = new InetSocketAddress(0); + private int ioThreads = 2; + private int workerThreads = Runtime.getRuntime().availableProcessors() - 2; + // 16Mb + private int maxReadBufferBytes = 16 * 1024 * 1024; + + private Builder(Class<I> serviceType) { + this.serviceType = serviceType; + this.name = serviceType.getSimpleName(); + } + + public Builder<I> setName(String name) { + this.name = name; + return this; + } + + public Builder<I> setHost(String host) { + this.bindAddress = new InetSocketAddress(host, bindAddress.getPort()); + return this; + } + + public Builder<I> setPort(int port) { + this.bindAddress = new InetSocketAddress(bindAddress.getHostName(), port); + return this; + } + + public Builder<I> setIOThreads(int count) { + this.ioThreads = count; + return this; + } + + public Builder<I> setWorkerThreads(int count) { + this.workerThreads = count; + return this; + } + + public Builder<I> setMaxReadBufferBytes(int maxReadBufferBytes) { + this.maxReadBufferBytes = maxReadBufferBytes; + return this; + } + + public <T extends RPCServiceHandler> ThriftRPCServer<T, I> build(T serviceHandler) { + return new ThriftRPCServer<T, I>(bindAddress, ioThreads, workerThreads, maxReadBufferBytes, + serviceHandler, serviceType, name); + } + } + + /** + * Creates a ThriftRPCServer with the given paramters. + * + * @param bindAddress The socket address for the server to listen on. If {@code null}, it'll be binded to random + * port on localhost. + * @param ioThreads Number of io threads. + * @param workerThreads Number of worker threads. + * @param serviceHandler Handler for handling client requests. + */ + @SuppressWarnings("unchecked") + private ThriftRPCServer(InetSocketAddress bindAddress, int ioThreads, + int workerThreads, int maxReadBufferBytes, + T serviceHandler, Class<I> serviceType, String name) { + Preconditions.checkArgument(ioThreads > 0, "IO threads must be > 0."); + Preconditions.checkArgument(workerThreads > 0, "Worker threads must be > 0."); + + this.bindAddress = bindAddress; + this.ioThreads = ioThreads; + this.workerThreads = workerThreads; + this.maxReadBufferBytes = maxReadBufferBytes; + this.serviceHandler = serviceHandler; + this.name = name; + this.processor = createProcessor((Class<T>) serviceHandler.getClass(), serviceType); + } + + public InetSocketAddress getBindAddress() { + return bindAddress; + } + + @Override + protected void startUp() throws Exception { + // Determines the address and port to listen on + InetSocketAddress listenOn = bindAddress; + if (listenOn == null || listenOn.getPort() <= 0) { + int port = Networks.getRandomPort(); + if (listenOn == null) { + listenOn = new InetSocketAddress("localhost", port); + } else { + listenOn = new InetSocketAddress(listenOn.getAddress(), port); + } + } + bindAddress = listenOn; + + executor = new ThreadPoolExecutor(0, workerThreads, + 60L, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), + Threads.createDaemonThreadFactory(String.format("%s-rpc-%%d", name)), + new ThreadPoolExecutor.CallerRunsPolicy()); + serviceHandler.init(); + + TThreadedSelectorServerWithFix.Args args = + new TThreadedSelectorServerWithFix.Args(new TNonblockingServerSocket(listenOn)) + .selectorThreads(ioThreads) + .protocolFactory(new TBinaryProtocol.Factory()) + .transportFactory(new TFramedTransport.Factory()) + .processor(processor) + .executorService(executor); + + // ENG-443 - Set the max read buffer size. This is important as this will + // prevent the server from throwing OOME if telnetd to the port + // it's running on. + args.maxReadBufferBytes = maxReadBufferBytes; + server = new TThreadedSelectorServerWithFix(args); + LOG.info("Starting RPC server for {}", name); + } + + @Override + protected void shutDown() throws Exception { + serviceHandler.destroy(); + executor.shutdownNow(); + LOG.info("RPC server for {} stopped.", name); + } + + @Override + protected void triggerShutdown() { + LOG.info("Request to stop RPC server for {}", name); + server.stop(); + } + + @Override + protected void run() throws Exception { + LOG.info("Running RPC server for {}", name); + server.serve(); + LOG.info("Done running RPC server for {}", name); + } + + @SuppressWarnings("unchecked") + private TProcessor createProcessor(final Class<T> handlerType, Class<I> serviceType) { + // Pick the Iface inner interface and the Processor class + Class<? extends TProcessor> processorType = null; + Class<?> ifaceType = null; + for (Class<?> clz : serviceType.getDeclaredClasses()) { + if (TProcessor.class.isAssignableFrom(clz)) { + processorType = (Class<? extends TProcessor>) clz; + } else if (clz.isInterface() && "Iface".equals(clz.getSimpleName())) { + ifaceType = clz; + } + } + + Preconditions.checkArgument(processorType != null, + "Missing TProcessor, %s is not a valid thrift service.", serviceType.getName()); + Preconditions.checkArgument(ifaceType != null, + "Missing Iface, %s is not a valid thrift service.", serviceType.getName()); + + // If handler already implements the Iface, simply delegate + if (ifaceType.isAssignableFrom(handlerType)) { + return createProxyProcessor(handlerType, processorType, ifaceType); + } + + throw new IllegalArgumentException("Unsupported handler type."); + } + + private TProcessor createProxyProcessor(final Class<T> handlerType, + Class<? extends TProcessor> processorType, Class<?> ifaceType) { + + try { + // Map from Iface method to handlerType method to save reflection lookup + ImmutableMap.Builder<Method, Method> builder = ImmutableMap.builder(); + for (Method method : ifaceType.getMethods()) { + Method handlerMethod = handlerType.getMethod(method.getName(), method.getParameterTypes()); + if (!handlerMethod.isAccessible()) { + handlerMethod.setAccessible(true); + } + builder.put(method, handlerMethod); + } + final Map<Method, Method> methods = builder.build(); + + Object proxy = Proxy.newProxyInstance(ifaceType.getClassLoader(), + new Class[]{ifaceType}, new InvocationHandler() { + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + return methods.get(method).invoke(serviceHandler, args); + } catch (InvocationTargetException e) { + if (e.getCause() != null) { + throw e.getCause(); + } else { + throw e; + } + } + } + }); + + return processorType.getConstructor(ifaceType).newInstance(proxy); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java b/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java new file mode 100644 index 0000000..96a5fe5 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/rpc/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains class for writing RPC server and client in simple manner. + */ +package org.apache.tephra.rpc; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java new file mode 100644 index 0000000..b8ae111 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/ConfigModule.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.AbstractModule; +import org.apache.hadoop.conf.Configuration; + +/** + * Provides Guice bindings for {@link Configuration}. + */ +public final class ConfigModule extends AbstractModule { + + private final Configuration configuration; + + public ConfigModule(Configuration configuration) { + this.configuration = configuration; + } + + @Override + protected void configure() { + bind(Configuration.class).toInstance(configuration); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java new file mode 100644 index 0000000..b4e2d1b --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/DiscoveryModules.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.google.inject.PrivateModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import org.apache.twill.common.Cancellable; +import org.apache.twill.discovery.Discoverable; +import org.apache.twill.discovery.DiscoveryService; +import org.apache.twill.discovery.DiscoveryServiceClient; +import org.apache.twill.discovery.InMemoryDiscoveryService; +import org.apache.twill.discovery.ServiceDiscovered; +import org.apache.twill.discovery.ZKDiscoveryService; +import org.apache.twill.zookeeper.ZKClientService; + +/** + * Provides access to Google Guice modules for in-memory, single-node, and distributed operation for + * {@link DiscoveryService} and {@link DiscoveryServiceClient}. + */ +public final class DiscoveryModules { + + public Module getInMemoryModules() { + return new InMemoryDiscoveryModule(); + } + + public Module getSingleNodeModules() { + return new InMemoryDiscoveryModule(); + } + + public Module getDistributedModules() { + return new ZKDiscoveryModule(); + } + + private static final class InMemoryDiscoveryModule extends AbstractModule { + + // ensuring to be singleton across JVM + private static final InMemoryDiscoveryService IN_MEMORY_DISCOVERY_SERVICE = new InMemoryDiscoveryService(); + + @Override + protected void configure() { + InMemoryDiscoveryService discovery = IN_MEMORY_DISCOVERY_SERVICE; + bind(DiscoveryService.class).toInstance(discovery); + bind(DiscoveryServiceClient.class).toInstance(discovery); + } + } + + private static final class ZKDiscoveryModule extends PrivateModule { + + @Override + protected void configure() { + expose(DiscoveryService.class); + expose(DiscoveryServiceClient.class); + } + + @Provides + @Singleton + private ZKDiscoveryService providesZKDiscoveryService(ZKClientService zkClient) { + return new ZKDiscoveryService(zkClient); + } + + @Provides + @Singleton + private DiscoveryService providesDiscoveryService(final ZKClientService zkClient, + final ZKDiscoveryService delegate) { + return new DiscoveryService() { + @Override + public Cancellable register(Discoverable discoverable) { + if (!zkClient.isRunning()) { + zkClient.startAndWait(); + } + return delegate.register(discoverable); + } + }; + } + + @Provides + @Singleton + private DiscoveryServiceClient providesDiscoveryServiceClient(final ZKClientService zkClient, + final ZKDiscoveryService delegate) { + return new DiscoveryServiceClient() { + @Override + public ServiceDiscovered discover(String s) { + if (!zkClient.isRunning()) { + zkClient.startAndWait(); + } + return delegate.discover(s); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java new file mode 100644 index 0000000..6689d05 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionClientModule.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.tephra.distributed.PooledClientProvider; +import org.apache.tephra.distributed.ThreadLocalClientProvider; +import org.apache.tephra.distributed.ThriftClientProvider; +import org.apache.twill.discovery.DiscoveryServiceClient; + +/** + * Provides Guice binding for {@link ThriftClientProvider}. + */ +public class TransactionClientModule extends AbstractModule { + + @Override + protected void configure() { + bind(ThriftClientProvider.class).toProvider(ThriftClientProviderSupplier.class); + } + + /** + * Provides implementation of {@link ThriftClientProvider} + * based on configuration. + */ + @Singleton + private static final class ThriftClientProviderSupplier implements Provider<ThriftClientProvider> { + + private final Configuration cConf; + private DiscoveryServiceClient discoveryServiceClient; + + @Inject + ThriftClientProviderSupplier(Configuration cConf) { + this.cConf = cConf; + } + + @Inject(optional = true) + void setDiscoveryServiceClient(DiscoveryServiceClient discoveryServiceClient) { + this.discoveryServiceClient = discoveryServiceClient; + } + + @Override + public ThriftClientProvider get() { + // configure the client provider + String provider = cConf.get(TxConstants.Service.CFG_DATA_TX_CLIENT_PROVIDER, + TxConstants.Service.DEFAULT_DATA_TX_CLIENT_PROVIDER); + ThriftClientProvider clientProvider; + if ("pool".equals(provider)) { + clientProvider = new PooledClientProvider(cConf, discoveryServiceClient); + } else if ("thread-local".equals(provider)) { + clientProvider = new ThreadLocalClientProvider(cConf, discoveryServiceClient); + } else { + String message = "Unknown Transaction Service Client Provider '" + provider + "'."; + throw new IllegalArgumentException(message); + } + return clientProvider; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java new file mode 100644 index 0000000..aaf3534 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.google.inject.name.Names; +import org.apache.tephra.DefaultTransactionExecutor; +import org.apache.tephra.TransactionExecutor; +import org.apache.tephra.TransactionExecutorFactory; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.distributed.TransactionServiceClient; +import org.apache.tephra.metrics.DefaultMetricsCollector; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.persist.HDFSTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; + +/** + * Guice bindings for running in distributed mode on a cluster. + */ +final class TransactionDistributedModule extends AbstractModule { + + @Override + protected void configure() { + bind(SnapshotCodecProvider.class).in(Singleton.class); + bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) + .to(HDFSTransactionStateStorage.class).in(Singleton.class); + bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class); + + bind(TransactionManager.class).in(Singleton.class); + bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Singleton.class); + bind(MetricsCollector.class).to(DefaultMetricsCollector.class).in(Singleton.class); + + install(new FactoryModuleBuilder() + .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) + .build(TransactionExecutorFactory.class)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java new file mode 100644 index 0000000..de7678a --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import org.apache.tephra.DefaultTransactionExecutor; +import org.apache.tephra.TransactionExecutor; +import org.apache.tephra.TransactionExecutorFactory; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.metrics.TxMetricsCollector; +import org.apache.tephra.persist.NoOpTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; + +/** + * Guice bindings for running completely in-memory (no persistence). This should only be used for + * test classes, as the transaction state cannot be recovered in the case of a failure. + */ +public class TransactionInMemoryModule extends AbstractModule { + public TransactionInMemoryModule() { + } + + @Override + protected void configure() { + bind(SnapshotCodecProvider.class).in(Singleton.class); + bind(TransactionStateStorage.class).to(NoOpTransactionStateStorage.class).in(Singleton.class); + bind(TransactionManager.class).in(Singleton.class); + bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); + // no metrics output for in-memory + bind(MetricsCollector.class).to(TxMetricsCollector.class); + + install(new FactoryModuleBuilder() + .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) + .build(TransactionExecutorFactory.class)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java new file mode 100644 index 0000000..7d0b663 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.AbstractModule; +import com.google.inject.Singleton; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.google.inject.name.Names; +import org.apache.tephra.DefaultTransactionExecutor; +import org.apache.tephra.TransactionExecutor; +import org.apache.tephra.TransactionExecutorFactory; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionSystemClient; +import org.apache.tephra.inmemory.InMemoryTxSystemClient; +import org.apache.tephra.metrics.DefaultMetricsCollector; +import org.apache.tephra.metrics.MetricsCollector; +import org.apache.tephra.persist.LocalFileTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; +import org.apache.tephra.snapshot.SnapshotCodecProvider; + +/** + * Guice bindings for running in single-node mode (persistence to local disk and in-memory client). + */ +final class TransactionLocalModule extends AbstractModule { + + @Override + protected void configure() { + bind(SnapshotCodecProvider.class).in(Singleton.class); + bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")) + .to(LocalFileTransactionStateStorage.class).in(Singleton.class); + bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class).in(Singleton.class); + + bind(TransactionManager.class).in(Singleton.class); + bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class); + bind(MetricsCollector.class).to(DefaultMetricsCollector.class); + + install(new FactoryModuleBuilder() + .implement(TransactionExecutor.class, DefaultTransactionExecutor.class) + .build(TransactionExecutorFactory.class)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java new file mode 100644 index 0000000..a3fe1c1 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.inject.Module; + +/** + * Provides access to Google Guice modules for in-memory, single-node, and distributed operation. + */ +public class TransactionModules { + public TransactionModules() { + } + + public Module getInMemoryModules() { + return new TransactionInMemoryModule(); + } + + public Module getSingleNodeModules() { + return new TransactionLocalModule(); + } + + public Module getDistributedModules() { + return new TransactionDistributedModule(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java new file mode 100644 index 0000000..5456553 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionStateStorageProvider.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tephra.runtime; + +import com.google.inject.Inject; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.Provider; +import com.google.inject.Singleton; +import com.google.inject.name.Names; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.tephra.persist.NoOpTransactionStateStorage; +import org.apache.tephra.persist.TransactionStateStorage; + +/** + * A provider for {@link TransactionStateStorage} that provides different + * {@link TransactionStateStorage} implementation based on configuration. + */ +@Singleton +public final class TransactionStateStorageProvider implements Provider<TransactionStateStorage> { + + private final Configuration cConf; + private final Injector injector; + + @Inject + TransactionStateStorageProvider(Configuration cConf, Injector injector) { + this.cConf = cConf; + this.injector = injector; + } + + @Override + public TransactionStateStorage get() { + if (cConf.getBoolean(TxConstants.Manager.CFG_DO_PERSIST, true)) { + return injector.getInstance(Key.get(TransactionStateStorage.class, Names.named("persist"))); + } else { + return injector.getInstance(NoOpTransactionStateStorage.class); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java b/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java new file mode 100644 index 0000000..da3e019 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/runtime/ZKModule.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.runtime; + +import com.google.common.collect.ArrayListMultimap; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.tephra.zookeeper.TephraZKClientService; +import org.apache.twill.zookeeper.RetryStrategies; +import org.apache.twill.zookeeper.ZKClient; +import org.apache.twill.zookeeper.ZKClientService; +import org.apache.twill.zookeeper.ZKClientServices; +import org.apache.twill.zookeeper.ZKClients; + +import java.util.concurrent.TimeUnit; + +/** + * Provides Guice binding to {@link ZKClient} and {@link ZKClientService}. + */ +public class ZKModule extends AbstractModule { + + @Override + protected void configure() { + /** + * ZKClientService is provided by the provider method + * {@link #provideZKClientService(org.apache.hadoop.conf.Configuration)}. + */ + bind(ZKClient.class).to(ZKClientService.class); + } + + @Provides + @Singleton + private ZKClientService provideZKClientService(Configuration conf) { + String zkStr = conf.get(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM); + if (zkStr == null) { + // Default to HBase one. + zkStr = conf.get(TxConstants.HBase.ZOOKEEPER_QUORUM); + } + + int timeOut = conf.getInt(TxConstants.HBase.ZK_SESSION_TIMEOUT, TxConstants.HBase.DEFAULT_ZK_SESSION_TIMEOUT); + ZKClientService zkClientService = new TephraZKClientService(zkStr, timeOut, null, + ArrayListMultimap.<String, byte[]>create()); + return ZKClientServices.delegate( + ZKClients.reWatchOnExpire( + ZKClients.retryOnFailure(zkClientService, RetryStrategies.exponentialDelay(500, 2000, TimeUnit.MILLISECONDS) + ) + ) + ); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java new file mode 100644 index 0000000..a1cd6dd --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryDecoder.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +/** + * An decoder to help read snapshots in binary format. + */ +public final class BinaryDecoder { + + private final InputStream input; + + /** + * @param input Stream to read from. + */ + public BinaryDecoder(InputStream input) { + this.input = input; + } + + /** + * Read one int from the input. + * @return the read number + * @throws java.io.IOException If there is IO error. + * @throws java.io.EOFException If end of file reached. + */ + public int readInt() throws IOException { + int val = 0; + int shift = 0; + int b = readByte(); + while (b > 0x7f) { + val ^= (b & 0x7f) << shift; + shift += 7; + b = readByte(); + } + val ^= b << shift; + return (val >>> 1) ^ -(val & 1); + } + + /** + * Read one long int from the input. + * @return the read number + * @throws java.io.IOException If there is IO error. + * @throws java.io.EOFException If end of file reached. + */ + public long readLong() throws IOException { + long val = 0; + int shift = 0; + int b = readByte(); + while (b > 0x7f) { + val ^= (long) (b & 0x7f) << shift; + shift += 7; + b = readByte(); + } + val ^= (long) b << shift; + return (val >>> 1) ^ -(val & 1); + } + + /** + * Read a byte sequence. First read an int to indicate how many bytes to read, then that many bytes. + * @return the read bytes as a byte array + * @throws java.io.IOException If there is IO error. + * @throws java.io.EOFException If end of file reached. + */ + public byte[] readBytes() throws IOException { + int toRead = readInt(); + byte[] bytes = new byte[toRead]; + while (toRead > 0) { + int byteRead = input.read(bytes, bytes.length - toRead, toRead); + if (byteRead == -1) { + throw new EOFException(); + } + toRead -= byteRead; + } + return bytes; + } + + /** + * Reads a single byte value. + * + * @return The byte value read. + * @throws java.io.IOException If there is IO error. + * @throws java.io.EOFException If end of file reached. + */ + private int readByte() throws IOException { + int b = input.read(); + if (b == -1) { + throw new EOFException(); + } + return b; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java new file mode 100644 index 0000000..c4ce0a8 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/BinaryEncoder.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * An encoder to help encode snapshots in binary format. + */ +public final class BinaryEncoder { + + private final OutputStream output; + + /** + * @param output stream to write to + */ + public BinaryEncoder(OutputStream output) { + this.output = output; + } + + /** + * write a single int value. + * @throws java.io.IOException If there is IO error. + */ + public BinaryEncoder writeInt(int i) throws IOException { + // Compute the zig-zag value. First double the value and flip the bit if the input is negative. + int val = (i << 1) ^ (i >> 31); + + if ((val & ~0x7f) != 0) { + output.write(0x80 | val & 0x7f); + val >>>= 7; + while (val > 0x7f) { + output.write(0x80 | val & 0x7f); + val >>>= 7; + } + } + output.write(val); + + return this; + } + + /** + * write a single long int value. + * @throws java.io.IOException If there is IO error. + */ + public BinaryEncoder writeLong(long l) throws IOException { + // Compute the zig-zag value. First double the value and flip the bit if the input is negative. + long val = (l << 1) ^ (l >> 63); + + if ((val & ~0x7f) != 0) { + output.write((int) (0x80 | val & 0x7f)); + val >>>= 7; + while (val > 0x7f) { + output.write((int) (0x80 | val & 0x7f)); + val >>>= 7; + } + } + output.write((int) val); + + return this; + } + + /** + * write a sequence of bytes. First writes the number of bytes as an int, then the bytes themselves. + * @throws java.io.IOException If there is IO error. + */ + public BinaryEncoder writeBytes(byte[] bytes) throws IOException { + writeLong(bytes.length); + output.write(bytes, 0, bytes.length); + return this; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java new file mode 100644 index 0000000..4a94c74 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/DefaultSnapshotCodec.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.tephra.ChangeId; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; + +/** + * Handles serialization/deserialization of a {@link TransactionSnapshot} + * and its elements to {@code byte[]}. + * @deprecated This codec is now deprecated and is replaced by {@link SnapshotCodecV2}. + */ +@Deprecated +public class DefaultSnapshotCodec implements SnapshotCodec { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultSnapshotCodec.class); + + @Override + public int getVersion() { + return 1; + } + + @Override + public void encode(OutputStream out, TransactionSnapshot snapshot) { + try { + BinaryEncoder encoder = new BinaryEncoder(out); + + encoder.writeLong(snapshot.getTimestamp()); + encoder.writeLong(snapshot.getReadPointer()); + encoder.writeLong(snapshot.getWritePointer()); + encodeInvalid(encoder, snapshot.getInvalid()); + encodeInProgress(encoder, snapshot.getInProgress()); + encodeChangeSets(encoder, snapshot.getCommittingChangeSets()); + encodeChangeSets(encoder, snapshot.getCommittedChangeSets()); + + } catch (IOException e) { + LOG.error("Unable to serialize transaction state: ", e); + throw Throwables.propagate(e); + } + } + + @Override + public TransactionSnapshot decode(InputStream in) { + BinaryDecoder decoder = new BinaryDecoder(in); + + try { + TransactionVisibilityState minTxSnapshot = decodeTransactionVisibilityState(in); + NavigableMap<Long, Set<ChangeId>> committing = decodeChangeSets(decoder); + NavigableMap<Long, Set<ChangeId>> committed = decodeChangeSets(decoder); + return new TransactionSnapshot(minTxSnapshot.getTimestamp(), minTxSnapshot.getReadPointer(), + minTxSnapshot.getWritePointer(), minTxSnapshot.getInvalid(), + minTxSnapshot.getInProgress(), committing, committed); + } catch (IOException e) { + LOG.error("Unable to deserialize transaction state: ", e); + throw Throwables.propagate(e); + } + } + + @Override + public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) { + BinaryDecoder decoder = new BinaryDecoder(in); + try { + long timestamp = decoder.readLong(); + long readPointer = decoder.readLong(); + long writePointer = decoder.readLong(); + Collection<Long> invalid = decodeInvalid(decoder); + NavigableMap<Long, TransactionManager.InProgressTx> inProgress = decodeInProgress(decoder); + return new TransactionSnapshot(timestamp, readPointer, writePointer, invalid, inProgress); + } catch (IOException e) { + LOG.error("Unable to deserialize transaction state: ", e); + throw Throwables.propagate(e); + } + } + + private void encodeInvalid(BinaryEncoder encoder, Collection<Long> invalid) throws IOException { + if (!invalid.isEmpty()) { + encoder.writeInt(invalid.size()); + for (long invalidTx : invalid) { + encoder.writeLong(invalidTx); + } + } + encoder.writeInt(0); // zero denotes end of list as per AVRO spec + } + + private Collection<Long> decodeInvalid(BinaryDecoder decoder) throws IOException { + int size = decoder.readInt(); + Collection<Long> invalid = Lists.newArrayListWithCapacity(size); + while (size != 0) { // zero denotes end of list as per AVRO spec + for (int remaining = size; remaining > 0; --remaining) { + invalid.add(decoder.readLong()); + } + size = decoder.readInt(); + } + return invalid; + } + + protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress) + throws IOException { + + if (!inProgress.isEmpty()) { + encoder.writeInt(inProgress.size()); + for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { + encoder.writeLong(entry.getKey()); // tx id + encoder.writeLong(entry.getValue().getExpiration()); + encoder.writeLong(entry.getValue().getVisibilityUpperBound()); + } + } + encoder.writeInt(0); // zero denotes end of list as per AVRO spec + } + + protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder) + throws IOException { + + int size = decoder.readInt(); + NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); + while (size != 0) { // zero denotes end of list as per AVRO spec + for (int remaining = size; remaining > 0; --remaining) { + long txId = decoder.readLong(); + long expiration = decoder.readLong(); + long visibilityUpperBound = decoder.readLong(); + inProgress.put(txId, + new TransactionManager.InProgressTx(visibilityUpperBound, expiration)); + } + size = decoder.readInt(); + } + return inProgress; + } + + private void encodeChangeSets(BinaryEncoder encoder, Map<Long, Set<ChangeId>> changes) throws IOException { + if (!changes.isEmpty()) { + encoder.writeInt(changes.size()); + for (Map.Entry<Long, Set<ChangeId>> entry : changes.entrySet()) { + encoder.writeLong(entry.getKey()); + encodeChanges(encoder, entry.getValue()); + } + } + encoder.writeInt(0); // zero denotes end of list as per AVRO spec + } + + private NavigableMap<Long, Set<ChangeId>> decodeChangeSets(BinaryDecoder decoder) throws IOException { + int size = decoder.readInt(); + NavigableMap<Long, Set<ChangeId>> changeSets = new TreeMap<Long, Set<ChangeId>>(); + while (size != 0) { // zero denotes end of list as per AVRO spec + for (int remaining = size; remaining > 0; --remaining) { + changeSets.put(decoder.readLong(), decodeChanges(decoder)); + } + size = decoder.readInt(); + } + return changeSets; + } + + private void encodeChanges(BinaryEncoder encoder, Set<ChangeId> changes) throws IOException { + if (!changes.isEmpty()) { + encoder.writeInt(changes.size()); + for (ChangeId change : changes) { + encoder.writeBytes(change.getKey()); + } + } + encoder.writeInt(0); // zero denotes end of list as per AVRO spec + } + + private Set<ChangeId> decodeChanges(BinaryDecoder decoder) throws IOException { + int size = decoder.readInt(); + HashSet<ChangeId> changes = Sets.newHashSetWithExpectedSize(size); + while (size != 0) { // zero denotes end of list as per AVRO spec + for (int remaining = size; remaining > 0; --remaining) { + changes.add(new ChangeId(decoder.readBytes())); + } + size = decoder.readInt(); + } + // todo is there an immutable hash set? + return changes; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java new file mode 100644 index 0000000..e2c5c16 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodec.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionVisibilityState; + +import java.io.InputStream; +import java.io.OutputStream; + +/** + * Interface to decode and encode a transaction snapshot. Each codec implements one version of the encoding. + * It need not include the version when encoding the snapshot. + */ +public interface SnapshotCodec { + + /** + * @return the version of the encoding implemented by the codec. + */ + int getVersion(); + + /** + * Encode a transaction snapshot into an output stream. + * @param out the output stream to write to + * @param snapshot the snapshot to encode + */ + void encode(OutputStream out, TransactionSnapshot snapshot); + + /** + * Decode a transaction snapshot from an input stream. + * @param in the input stream to read from + * @return the decoded snapshot + */ + TransactionSnapshot decode(InputStream in); + + /** + * Decode transaction visibility state from an input stream. + * @param in the input stream to read from + * @return {@link TransactionVisibilityState} + */ + TransactionVisibilityState decodeTransactionVisibilityState(InputStream in); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java new file mode 100644 index 0000000..3756846 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecProvider.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.inject.Inject; +import org.apache.hadoop.conf.Configuration; +import org.apache.tephra.TxConstants; +import org.apache.tephra.persist.TransactionSnapshot; +import org.apache.tephra.persist.TransactionVisibilityState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.List; +import java.util.SortedMap; +import javax.annotation.Nonnull; + +/** + * Maintains the codecs for all known versions of the transaction snapshot encoding. + */ +public class SnapshotCodecProvider implements SnapshotCodec { + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotCodecProvider.class); + + private final SortedMap<Integer, SnapshotCodec> codecs = Maps.newTreeMap(); + + @Inject + public SnapshotCodecProvider(Configuration configuration) { + initialize(configuration); + } + + /** + * Register all codec specified in the configuration with this provider. + * There can only be one codec for a given version. + */ + private void initialize(Configuration configuration) { + String[] codecClassNames = configuration.getTrimmedStrings(TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES); + List<Class> codecClasses = Lists.newArrayList(); + if (codecClassNames != null) { + for (String clsName : codecClassNames) { + try { + codecClasses.add(Class.forName(clsName)); + } catch (ClassNotFoundException cnfe) { + LOG.warn("Unable to load class configured for " + TxConstants.Persist.CFG_TX_SNAPHOT_CODEC_CLASSES + + ": " + clsName, cnfe); + } + } + } + + if (codecClasses.size() == 0) { + codecClasses.addAll(Arrays.asList(TxConstants.Persist.DEFAULT_TX_SNAPHOT_CODEC_CLASSES)); + } + for (Class<?> codecClass : codecClasses) { + try { + SnapshotCodec codec = (SnapshotCodec) (codecClass.newInstance()); + codecs.put(codec.getVersion(), codec); + LOG.debug("Using snapshot codec {} for snapshots of version {}", codecClass.getName(), codec.getVersion()); + } catch (Exception e) { + LOG.warn("Error instantiating snapshot codec {}. Skipping.", codecClass.getName(), e); + } + } + } + + /** + * Retrieve the codec for a particular version of the encoding. + * @param version the version of interest + * @return the corresponding codec + * @throws java.lang.IllegalArgumentException if the version is not known + */ + @Nonnull + @VisibleForTesting + SnapshotCodec getCodecForVersion(int version) { + SnapshotCodec codec = codecs.get(version); + if (codec == null) { + throw new IllegalArgumentException(String.format("Version %d of snapshot encoding is not supported", version)); + } + return codec; + } + + /** + * Retrieve the current snapshot codec, that is, the codec with the highest known version. + * @return the current codec + * @throws java.lang.IllegalStateException if no codecs are registered + */ + private SnapshotCodec getCurrentCodec() { + if (codecs.isEmpty()) { + throw new IllegalStateException(String.format("No codecs are registered.")); + } + return codecs.get(codecs.lastKey()); + } + + // Return the appropriate codec for the version in InputStream + private SnapshotCodec getCodec(InputStream in) { + BinaryDecoder decoder = new BinaryDecoder(in); + int persistedVersion; + try { + persistedVersion = decoder.readInt(); + } catch (IOException e) { + LOG.error("Unable to read transaction state version: ", e); + throw Throwables.propagate(e); + } + return getCodecForVersion(persistedVersion); + } + + @Override + public int getVersion() { + return getCurrentCodec().getVersion(); + } + + @Override + public TransactionSnapshot decode(InputStream in) { + return getCodec(in).decode(in); + } + + @Override + public TransactionVisibilityState decodeTransactionVisibilityState(InputStream in) { + return getCodec(in).decodeTransactionVisibilityState(in); + } + + @Override + public void encode(OutputStream out, TransactionSnapshot snapshot) { + SnapshotCodec codec = getCurrentCodec(); + try { + new BinaryEncoder(out).writeInt(codec.getVersion()); + } catch (IOException e) { + LOG.error("Unable to write transaction state version: ", e); + throw Throwables.propagate(e); + } + codec.encode(out, snapshot); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java new file mode 100644 index 0000000..ccf026d --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV2.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.persist.TransactionSnapshot; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +/** + * Handles serialization/deserialization of a {@link TransactionSnapshot} + * and its elements to {@code byte[]}. + */ +public class SnapshotCodecV2 extends DefaultSnapshotCodec { + @Override + public int getVersion() { + return 2; + } + + @Override + protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress) + throws IOException { + + if (!inProgress.isEmpty()) { + encoder.writeInt(inProgress.size()); + for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { + encoder.writeLong(entry.getKey()); // tx id + encoder.writeLong(entry.getValue().getExpiration()); + encoder.writeLong(entry.getValue().getVisibilityUpperBound()); + encoder.writeInt(entry.getValue().getType().ordinal()); + } + } + encoder.writeInt(0); // zero denotes end of list as per AVRO spec + } + + @Override + protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder) + throws IOException { + + int size = decoder.readInt(); + NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); + while (size != 0) { // zero denotes end of list as per AVRO spec + for (int remaining = size; remaining > 0; --remaining) { + long txId = decoder.readLong(); + long expiration = decoder.readLong(); + long visibilityUpperBound = decoder.readLong(); + int txTypeIdx = decoder.readInt(); + TransactionType txType; + try { + txType = TransactionType.values()[txTypeIdx]; + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx); + } + inProgress.put(txId, + new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, + new LongArrayList())); + } + size = decoder.readInt(); + } + return inProgress; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java new file mode 100644 index 0000000..1b9e2b3 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV3.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import org.apache.tephra.persist.TransactionSnapshot; + +/** + * Handles serialization/deserialization of a {@link TransactionSnapshot} + * and its elements to {@code byte[]}. + * + * <p>The serialization/deserialization of this codec is the same as that performed by {@link SnapshotCodecV2}, + * but a new version number is used to allow easy migration from projects using deprecated codecs with + * conflicting version numbers.</p> + */ +public class SnapshotCodecV3 extends SnapshotCodecV2 { + @Override + public int getVersion() { + return 3; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java new file mode 100644 index 0000000..cadaa8e --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/SnapshotCodecV4.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.snapshot; + +import com.google.common.collect.Maps; +import it.unimi.dsi.fastutil.longs.LongArrayList; +import org.apache.tephra.TransactionManager; +import org.apache.tephra.TransactionType; +import org.apache.tephra.persist.TransactionSnapshot; + +import java.io.IOException; +import java.util.Map; +import java.util.NavigableMap; + +/** + * Handles serialization/deserialization of a {@link TransactionSnapshot} + * and its elements to {@code byte[]}. + * + */ +public class SnapshotCodecV4 extends SnapshotCodecV2 { + @Override + public int getVersion() { + return 4; + } + + @Override + protected void encodeInProgress(BinaryEncoder encoder, Map<Long, TransactionManager.InProgressTx> inProgress) + throws IOException { + + if (!inProgress.isEmpty()) { + encoder.writeInt(inProgress.size()); + for (Map.Entry<Long, TransactionManager.InProgressTx> entry : inProgress.entrySet()) { + encoder.writeLong(entry.getKey()); // tx id + encoder.writeLong(entry.getValue().getExpiration()); + encoder.writeLong(entry.getValue().getVisibilityUpperBound()); + encoder.writeInt(entry.getValue().getType().ordinal()); + // write checkpoint tx IDs + LongArrayList checkpointPointers = entry.getValue().getCheckpointWritePointers(); + if (checkpointPointers != null && !checkpointPointers.isEmpty()) { + encoder.writeInt(checkpointPointers.size()); + for (int i = 0; i < checkpointPointers.size(); i++) { + encoder.writeLong(checkpointPointers.getLong(i)); + } + } + encoder.writeInt(0); + } + } + encoder.writeInt(0); // zero denotes end of list as per AVRO spec + } + + @Override + protected NavigableMap<Long, TransactionManager.InProgressTx> decodeInProgress(BinaryDecoder decoder) + throws IOException { + + int size = decoder.readInt(); + NavigableMap<Long, TransactionManager.InProgressTx> inProgress = Maps.newTreeMap(); + while (size != 0) { // zero denotes end of list as per AVRO spec + for (int remaining = size; remaining > 0; --remaining) { + long txId = decoder.readLong(); + long expiration = decoder.readLong(); + long visibilityUpperBound = decoder.readLong(); + int txTypeIdx = decoder.readInt(); + TransactionType txType; + try { + txType = TransactionType.values()[txTypeIdx]; + } catch (ArrayIndexOutOfBoundsException e) { + throw new IOException("Type enum ordinal value is out of range: " + txTypeIdx); + } + // read checkpoint tx IDs + int checkpointPointerSize = decoder.readInt(); + LongArrayList checkpointPointers = new LongArrayList(checkpointPointerSize); + while (checkpointPointerSize != 0) { + for (int checkpointRemaining = checkpointPointerSize; checkpointRemaining > 0; --checkpointRemaining) { + checkpointPointers.add(decoder.readLong()); + } + checkpointPointerSize = decoder.readInt(); + } + inProgress.put(txId, + new TransactionManager.InProgressTx(visibilityUpperBound, expiration, txType, checkpointPointers)); + } + size = decoder.readInt(); + } + return inProgress; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java b/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java new file mode 100644 index 0000000..cf1a276 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/snapshot/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains interfaces and implementations for encoding and decoding transaction snapshots. + */ +package org.apache.tephra.snapshot; http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java new file mode 100644 index 0000000..a13668c --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import com.google.inject.Provider; +import org.apache.hadoop.conf.Configuration; + +/** + * Provides {@code org.apache.hadoop.conf.Configuration} instances, constructed by the correct version used + * for the runtime. + */ +public class ConfigurationFactory implements Provider<Configuration> { + private static class ConfigurationProviderFactory extends HBaseVersionSpecificFactory<ConfigurationProvider> { + @Override + protected String getHBase96Classname() { + return "org.apache.tephra.hbase96.HBase96ConfigurationProvider"; + } + + @Override + protected String getHBase98Classname() { + return "org.apache.tephra.hbase98.HBase98ConfigurationProvider"; + } + + @Override + protected String getHBase10Classname() { + return "org.apache.tephra.hbase10.HBase10ConfigurationProvider"; + } + + @Override + protected String getHBase11Classname() { + return "org.apache.tephra.hbase11.HBase11ConfigurationProvider"; + } + + @Override + protected String getHBase10CDHClassname() { + return "org.apache.tephra.hbase10cdh.HBase10ConfigurationProvider"; + } + } + + private final ConfigurationProvider provider = new ConfigurationProviderFactory().get(); + + /** + * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory. + */ + @Override + public Configuration get() { + return provider.get(); + } + + /** + * Returns a new {@link org.apache.hadoop.conf.Configuration} instance from the HBase version-specific factory. + * + * @param baseConf additional configuration properties to merge on to the classpath configuration + * @return the merged configuration + */ + public Configuration get(Configuration baseConf) { + return provider.get(baseConf); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java new file mode 100644 index 0000000..e133c69 --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/util/ConfigurationProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import com.google.inject.Provider; +import org.apache.hadoop.conf.Configuration; + +/** + * Provides {@code Configuration} instances, constructed by the HBase version on which we are running. + */ +public abstract class ConfigurationProvider implements Provider<Configuration> { + @Override + public abstract Configuration get(); + + public abstract Configuration get(Configuration baseConf); +} http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/9c693743/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java ---------------------------------------------------------------------- diff --git a/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java new file mode 100644 index 0000000..687e46d --- /dev/null +++ b/tephra-core/src/main/java/org/apache/tephra/util/HBaseVersion.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tephra.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.text.ParseException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Detects the currently loaded HBase version. It is assumed that only one HBase version is loaded at a time, + * since using more than one HBase version within the same process will require classloader isolation anyway. + */ +public class HBaseVersion { + private static final String HBASE_94_VERSION = "0.94"; + private static final String HBASE_96_VERSION = "0.96"; + private static final String HBASE_98_VERSION = "0.98"; + private static final String HBASE_10_VERSION = "1.0"; + private static final String HBASE_11_VERSION = "1.1"; + private static final String HBASE_12_VERSION = "1.2"; + private static final String CDH_CLASSIFIER = "cdh"; + + private static final Logger LOG = LoggerFactory.getLogger(HBaseVersion.class); + + /** + * Represents the major version of the HBase library that is currently loaded. + */ + public enum Version { + HBASE_94("0.94"), + HBASE_96("0.96"), + HBASE_98("0.98"), + HBASE_10("1.0"), + HBASE_10_CDH("1.0-cdh"), + HBASE_11("1.1"), + HBASE_12_CDH("1.2-cdh"), + UNKNOWN("unknown"); + + final String majorVersion; + + Version(String majorVersion) { + this.majorVersion = majorVersion; + } + + public String getMajorVersion() { + return majorVersion; + } + } + + private static Version currentVersion; + private static String versionString; + static { + try { + Class versionInfoClass = Class.forName("org.apache.hadoop.hbase.util.VersionInfo"); + Method versionMethod = versionInfoClass.getMethod("getVersion"); + versionString = (String) versionMethod.invoke(null); + if (versionString.startsWith(HBASE_94_VERSION)) { + currentVersion = Version.HBASE_94; + } else if (versionString.startsWith(HBASE_96_VERSION)) { + currentVersion = Version.HBASE_96; + } else if (versionString.startsWith(HBASE_98_VERSION)) { + currentVersion = Version.HBASE_98; + } else if (versionString.startsWith(HBASE_10_VERSION)) { + VersionNumber ver = VersionNumber.create(versionString); + if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) { + currentVersion = Version.HBASE_10_CDH; + } else { + currentVersion = Version.HBASE_10; + } + } else if (versionString.startsWith(HBASE_11_VERSION)) { + currentVersion = Version.HBASE_11; + } else if (versionString.startsWith(HBASE_12_VERSION)) { + VersionNumber ver = VersionNumber.create(versionString); + if (ver.getClassifier() != null && ver.getClassifier().startsWith(CDH_CLASSIFIER)) { + currentVersion = Version.HBASE_12_CDH; + } else { + // CDH 5.7 comes with HBase version 1.2.0-CDH5.7.0. However currently there is no + // other hadoop distribution that uses HBase 1.2, so the version is set here to UNKNOWN. + currentVersion = Version.UNKNOWN; + } + } else { + currentVersion = Version.UNKNOWN; + } + } catch (Throwable e) { + // must be a class loading exception, HBase is not there + LOG.error("Unable to determine HBase version from string '{}', are HBase classes available?", versionString); + LOG.error("Exception was: ", e); + currentVersion = Version.UNKNOWN; + } + } + + /** + * Returns the major version of the currently loaded HBase library. + */ + public static Version get() { + return currentVersion; + } + + /** + * Returns the full version string for the currently loaded HBase library. + */ + public static String getVersionString() { + return versionString; + } + + /** + * Prints out the HBase {@link Version} enum value for the current version of HBase on the classpath. + */ + public static void main(String[] args) { + boolean verbose = args.length == 1 && "-v".equals(args[0]); + Version version = HBaseVersion.get(); + System.out.println(version.getMajorVersion()); + if (verbose) { + System.out.println("versionString=" + getVersionString()); + } + } + + /** + * Utility class to parse apart version number components. The version string provided is expected to be in + * the format: major[.minor[.patch[.last]][-classifier][-SNAPSHOT] + * + * <p>Only the major version number is actually required.</p> + */ + public static class VersionNumber { + private static final Pattern PATTERN = + Pattern.compile("(\\d+)(\\.(\\d+))?(\\.(\\d+))?(\\.(\\d+))?(\\-(?!SNAPSHOT)([^\\-]+))?(\\-SNAPSHOT)?"); + + private Integer major; + private Integer minor; + private Integer patch; + private Integer last; + private String classifier; + private boolean snapshot; + + private VersionNumber(Integer major, Integer minor, Integer patch, Integer last, + String classifier, boolean snapshot) { + this.major = major; + this.minor = minor; + this.patch = patch; + this.last = last; + this.classifier = classifier; + this.snapshot = snapshot; + } + + public Integer getMajor() { + return major; + } + + public Integer getMinor() { + return minor; + } + + public Integer getPatch() { + return patch; + } + + public Integer getLast() { + return last; + } + + public String getClassifier() { + return classifier; + } + + public boolean isSnapshot() { + return snapshot; + } + + public static VersionNumber create(String versionString) throws ParseException { + Matcher matcher = PATTERN.matcher(versionString); + if (matcher.matches()) { + String majorString = matcher.group(1); + String minorString = matcher.group(3); + String patchString = matcher.group(5); + String last = matcher.group(7); + String classifier = matcher.group(9); + String snapshotString = matcher.group(10); + return new VersionNumber(new Integer(majorString), + minorString != null ? new Integer(minorString) : null, + patchString != null ? new Integer(patchString) : null, + last != null ? new Integer(last) : null, + classifier, + "-SNAPSHOT".equals(snapshotString)); + } + throw new ParseException( + "Input string did not match expected pattern: major[.minor[.patch]][-classifier][-SNAPSHOT]", 0); + } + } +}
