http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java new file mode 100644 index 0000000..a019a77 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfigBuilder.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import com.google.common.base.Preconditions; +import org.apache.drill.exec.serialization.InstanceSerializer; + +public class TransientStoreConfigBuilder<T> { + private String name; + private InstanceSerializer<T> serializer; + + protected TransientStoreConfigBuilder() { } + + public String name() { + return name; + } + + public TransientStoreConfigBuilder<T> name(final String name) { + this.name = Preconditions.checkNotNull(name); + return this; + } + + public InstanceSerializer<T> serializer() { + return serializer; + } + + public TransientStoreConfigBuilder<T> serializer(final InstanceSerializer<T> serializer) { + this.serializer = Preconditions.checkNotNull(serializer); + return this; + } + + public TransientStoreConfig<T> build() { + return new TransientStoreConfig<>(name, serializer); + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java new file mode 100644 index 0000000..a0b5725 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEvent.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * Represents an event created as a result of an operation over a particular (key, value) entry in a + * {@link TransientStore store} instance. + * + * Types of operations are enumerated in {@link TransientStoreEventType} + * + * @param <V> value type + */ +public class TransientStoreEvent<V> { + private final TransientStoreEventType type; + private final String key; + private final V value; + + public TransientStoreEvent(final TransientStoreEventType type, final String key, final V value) { + this.type = Preconditions.checkNotNull(type); + this.key = Preconditions.checkNotNull(key); + this.value = Preconditions.checkNotNull(value); + } + + public String getKey() { + return key; + } + + public TransientStoreEventType getType() { + return type; + } + + public V getValue() { + return value; + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof TransientStoreEvent && obj.getClass().equals(getClass())) { + final TransientStoreEvent<V> other = (TransientStoreEvent<V>)obj; + return Objects.equal(type, other.type) && Objects.equal(key, other.key) && Objects.equal(value, other.value); + } + return super.equals(obj); + } + + @Override + public int hashCode() { + return Objects.hashCode(type, key, value); + } + + public static <T> TransientStoreEvent<T>of(final TransientStoreEventType type, final String key, final T value) { + return new TransientStoreEvent<>(type, key, value); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java new file mode 100644 index 0000000..51ae2c7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreEventType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +/** + * Types of store events. + */ +public enum TransientStoreEventType { + CREATE, + UPDATE, + DELETE +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java new file mode 100644 index 0000000..c3d351d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +import org.apache.drill.exec.store.sys.PersistentStore; + +/** + * Factory that is used to obtain a {@link TransientStore store} instance. + */ +public interface TransientStoreFactory extends AutoCloseable { + + /** + * Returns a {@link TransientStore transient store} instance for the given configuration. + * + * Note that implementors have liberty to cache previous {@link PersistentStore store} instances. + * + * @param config store configuration + * @param <V> store value type + */ + <V> TransientStore<V> getOrCreateStore(TransientStoreConfig<V> config); +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java new file mode 100644 index 0000000..ca8fa9d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreListener.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.store; + +/** + * A listener used for observing {@link TransientStore transient store} {@link TransientStoreEvent events}. + */ +public interface TransientStoreListener { + + /** + * {@link TransientStore transient store} fires this method with event details upon an observed change. + * + * @param event event details + */ + void onChange(TransientStoreEvent event); + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java new file mode 100644 index 0000000..580cfcd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/EventDispatcher.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.drill.exec.coord.store.TransientStoreEvent; +import org.apache.drill.exec.coord.store.TransientStoreEventType; + +/** + * An abstraction used for dispatching store {@link TransientStoreEvent events}. + * + * @param <V> value type + */ +public class EventDispatcher<V> implements PathChildrenCacheListener { + public final static Map<PathChildrenCacheEvent.Type, TransientStoreEventType> MAPPINGS = ImmutableMap + .<PathChildrenCacheEvent.Type, TransientStoreEventType>builder() + .put(PathChildrenCacheEvent.Type.CHILD_ADDED, TransientStoreEventType.CREATE) + .put(PathChildrenCacheEvent.Type.CHILD_REMOVED, TransientStoreEventType.DELETE) + .put(PathChildrenCacheEvent.Type.CHILD_UPDATED, TransientStoreEventType.UPDATE) + .build(); + + private final ZkEphemeralStore<V> store; + + protected EventDispatcher(final ZkEphemeralStore<V> store) { + this.store = Preconditions.checkNotNull(store, "store is required"); + } + + @Override + public void childEvent(final CuratorFramework client, final PathChildrenCacheEvent event) throws Exception { + final PathChildrenCacheEvent.Type original = event.getType(); + final TransientStoreEventType mapped = MAPPINGS.get(original); + if (mapped != null) { // dispatch the event to listeners only if it can be mapped + final String path = event.getData().getPath(); + final byte[] bytes = event.getData().getData(); + final V value = store.getConfig().getSerializer().deserialize(bytes); + store.fireListeners(TransientStoreEvent.of(mapped, path, value)); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java new file mode 100644 index 0000000..f01b989 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/PathUtils.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import com.google.common.base.Preconditions; +import org.apache.parquet.Strings; + +/** + * A convenience class used to expedite zookeeper paths manipulations. + */ +public final class PathUtils { + + /** + * Returns a normalized, combined path out of the given path segments. + * + * @param parts path segments to combine + * @see #normalize(String) + */ + public static final String join(final String... parts) { + final StringBuilder sb = new StringBuilder(); + for (final String part:parts) { + Preconditions.checkNotNull(part, "parts cannot contain null"); + if (!Strings.isNullOrEmpty(part)) { + sb.append(part).append("/"); + } + } + if (sb.length() > 0) { + sb.deleteCharAt(sb.length() - 1); + } + final String path = sb.toString(); + return normalize(path); + } + + /** + * Normalizes the given path eliminating repeated forward slashes. + * + * @return normalized path + */ + public static final String normalize(final String path) { + if (Strings.isNullOrEmpty(Preconditions.checkNotNull(path))) { + return path; + } + + final StringBuilder builder = new StringBuilder(); + char last = path.charAt(0); + builder.append(last); + for (int i=1; i<path.length(); i++) { + char cur = path.charAt(i); + if (last == '/' && cur == last) { + continue; + } + builder.append(cur); + last = cur; + } + return builder.toString(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java index b831852..4926f9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZKClusterCoordinator.java @@ -24,13 +24,14 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.Lists; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -42,15 +43,19 @@ import org.apache.curator.x.discovery.ServiceDiscovery; import org.apache.curator.x.discovery.ServiceDiscoveryBuilder; import org.apache.curator.x.discovery.ServiceInstance; import org.apache.curator.x.discovery.details.ServiceCacheListener; +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.DistributedSemaphore; import org.apache.drill.exec.coord.DrillServiceInstanceHelper; +import org.apache.drill.exec.coord.store.CachingTransientStoreFactory; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreFactory; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.base.Function; -import org.apache.drill.exec.work.foreman.DrillbitStatusListener; /** * Manages cluster coordination utilizing zookeeper. * @@ -60,15 +65,14 @@ public class ZKClusterCoordinator extends ClusterCoordinator { private CuratorFramework curator; private ServiceDiscovery<DrillbitEndpoint> discovery; - private ServiceCache<DrillbitEndpoint> serviceCache; private volatile Collection<DrillbitEndpoint> endpoints = Collections.emptyList(); private final String serviceName; private final CountDownLatch initialConnection = new CountDownLatch(1); + private final TransientStoreFactory factory; + private ServiceCache<DrillbitEndpoint> serviceCache; private static final Pattern ZK_COMPLEX_STRING = Pattern.compile("(^.*?)/(.*)/([^/]*)$"); - - public ZKClusterCoordinator(DrillConfig config) throws IOException{ this(config, null); } @@ -100,11 +104,8 @@ public class ZKClusterCoordinator extends ClusterCoordinator { .build(); curator.getConnectionStateListenable().addListener(new InitialConnectionListener()); curator.start(); - discovery = getDiscovery(); - serviceCache = discovery. - serviceCacheBuilder() - .name(serviceName) - .build(); + discovery = newDiscovery(); + factory = CachingTransientStoreFactory.of(new ZkTransientStoreFactory(curator)); } public CuratorFramework getCurator() { @@ -115,8 +116,6 @@ public class ZKClusterCoordinator extends ClusterCoordinator { public void start(long millisToWait) throws Exception { logger.debug("Starting ZKClusterCoordination."); discovery.start(); - serviceCache.start(); - serviceCache.addListener(new ZKListener()); if(millisToWait != 0) { boolean success = this.initialConnection.await(millisToWait, TimeUnit.MILLISECONDS); @@ -127,6 +126,12 @@ public class ZKClusterCoordinator extends ClusterCoordinator { this.initialConnection.await(); } + serviceCache = discovery + .serviceCacheBuilder() + .name(serviceName) + .build(); + serviceCache.addListener(new EndpointListener()); + serviceCache.start(); updateEndpoints(); } @@ -142,29 +147,28 @@ public class ZKClusterCoordinator extends ClusterCoordinator { } - private class ZKListener implements ServiceCacheListener { - + private class EndpointListener implements ServiceCacheListener { @Override - public void stateChanged(CuratorFramework client, ConnectionState newState) { - } + public void stateChanged(CuratorFramework client, ConnectionState newState) { } @Override public void cacheChanged() { - logger.debug("Cache changed, updating."); + logger.debug("Got cache changed --> updating endpoints"); updateEndpoints(); } } - public void close() throws IOException { - serviceCache.close(); - discovery.close(); - curator.close(); + public void close() throws Exception { + // discovery attempts to close its caches(ie serviceCache) already. however, being good citizens we make sure to + // explicitly close serviceCache. Not only that we make sure to close serviceCache before discovery to prevent + // double releasing and disallowing jvm to spit bothering warnings. simply put, we are great! + AutoCloseables.close(serviceCache, discovery, curator, factory); } @Override public RegistrationHandle register(DrillbitEndpoint data) { try { - ServiceInstance<DrillbitEndpoint> serviceInstance = getServiceInstance(data); + ServiceInstance<DrillbitEndpoint> serviceInstance = newServiceInstance(data); discovery.registerService(serviceInstance); return new ZKRegistrationHandle(serviceInstance.getId()); } catch (Exception e) { @@ -206,6 +210,11 @@ public class ZKClusterCoordinator extends ClusterCoordinator { return new ZkDistributedSemaphore(curator, "/semaphore/" + name, maximumLeases); } + @Override + public <V> TransientStore<V> getOrCreateTransientStore(final TransientStoreConfig<V> config) { + final ZkEphemeralStore<V> store = (ZkEphemeralStore<V>)factory.getOrCreateStore(config); + return store; + } private synchronized void updateEndpoints() { try { @@ -253,7 +262,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { } } - private ServiceInstance<DrillbitEndpoint> getServiceInstance(DrillbitEndpoint endpoint) throws Exception { + protected ServiceInstance<DrillbitEndpoint> newServiceInstance(DrillbitEndpoint endpoint) throws Exception { return ServiceInstance.<DrillbitEndpoint>builder() .name(serviceName) .payload(endpoint) @@ -261,7 +270,7 @@ public class ZKClusterCoordinator extends ClusterCoordinator { } - public ServiceDiscovery<DrillbitEndpoint> getDiscovery() { + protected ServiceDiscovery<DrillbitEndpoint> newDiscovery() { return ServiceDiscoveryBuilder .builder(DrillbitEndpoint.class) .basePath("/") http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java new file mode 100644 index 0000000..94e03ad --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.coord.store.BaseTransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreEvent; +import org.apache.drill.exec.serialization.InstanceSerializer; +import org.apache.zookeeper.CreateMode; + +public class ZkEphemeralStore<V> extends BaseTransientStore<V> { + + @VisibleForTesting + protected final PathChildrenCacheListener dispatcher = new EventDispatcher<>(this); + private final ZookeeperClient client; + + public ZkEphemeralStore(final TransientStoreConfig<V> config, final CuratorFramework curator) { + super(config); + this.client = new ZookeeperClient(curator, PathUtils.join("/", config.getName()), CreateMode.EPHEMERAL); + } + + public void start() throws Exception { + getClient().getCache().getListenable().addListener(dispatcher); + getClient().start(); + } + + protected ZookeeperClient getClient() { + return client; + } + + @Override + public V get(final String key) { + final byte[] bytes = getClient().get(key); + if (bytes == null) { + return null; + } + try { + return config.getSerializer().deserialize(bytes); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to deserialize value at %s", key), e); + } + } + + @Override + public V put(final String key, final V value) { + final InstanceSerializer<V> serializer = config.getSerializer(); + try { + final byte[] old = getClient().get(key); + final byte[] bytes = serializer.serialize(value); + getClient().put(key, bytes); + if (old == null) { + return null; + } + return serializer.deserialize(old); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e); + } + } + + @Override + public V putIfAbsent(final String key, final V value) { + final V old = get(key); + if (old == null) { + try { + final byte[] bytes = config.getSerializer().serialize(value); + getClient().put(key, bytes); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); + } + } + return old; + } + + @Override + public V remove(final String key) { + final V existing = get(key); + if (existing != null) { + getClient().delete(key); + } + return existing; + } + + @Override + public Iterator<Map.Entry<String, V>> entries() { + return Iterators.transform(getClient().entries(), new Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() { + @Nullable + @Override + public Map.Entry<String, V> apply(@Nullable Map.Entry<String, byte[]> input) { + try { + final V value = config.getSerializer().deserialize(input.getValue()); + return new ImmutableEntry<>(input.getKey(), value); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to deserialize value at key %s", input.getKey()), e); + } + } + }); + } + + @Override + public int size() { + return getClient().getCache().getCurrentData().size(); + } + + @Override + public void close() throws Exception { + getClient().close(); + } + + /** + * This method override ensures package level method visibility. + */ + @Override + protected void fireListeners(TransientStoreEvent event) { + super.fireListeners(event); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java new file mode 100644 index 0000000..a58c376 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkTransientStoreFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import com.google.common.base.Preconditions; +import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.coord.store.TransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreFactory; + +public class ZkTransientStoreFactory implements TransientStoreFactory { + + private final CuratorFramework curator; + + public ZkTransientStoreFactory(final CuratorFramework curator) { + this.curator = Preconditions.checkNotNull(curator, "curator is required"); + } + + @Override + public <V> ZkEphemeralStore<V> getOrCreateStore(TransientStoreConfig<V> config) { + final ZkEphemeralStore<V> store = new ZkEphemeralStore<>(config, curator); + try { + store.start(); + } catch (final Exception e) { + throw new DrillRuntimeException("unable to start zookeeper transient store", e); + } + return store; + } + + @Override + public void close() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java new file mode 100644 index 0000000..1c33f71 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.util.Iterator; +import java.util.Map; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.zookeeper.CreateMode; + +/** + * A namespace aware Zookeeper client. + * + * The implementation only operates under the given namespace and is safe to use. + * + * Note that instance of this class holds onto resources that must be released via {@code #close()}. + */ +public class ZookeeperClient implements AutoCloseable { + private final CuratorFramework curator; + private final String root; + private final PathChildrenCache cache; + private final CreateMode mode; + + public ZookeeperClient(final CuratorFramework curator, final String root, final CreateMode mode) { + this.curator = Preconditions.checkNotNull(curator, "curator is required"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(root), "root path is required"); + Preconditions.checkArgument(root.charAt(0) == '/', "root path must be absolute"); + this.root = root; + this.mode = Preconditions.checkNotNull(mode, "mode is required"); + this.cache = new PathChildrenCache(curator, root, true); + } + + /** + * Starts the client. This call ensures the creation of the root path. + * + * @throws Exception if cache fails to start or root path creation fails. + * @see #close() + */ + public void start() throws Exception { + curator.newNamespaceAwareEnsurePath(root).ensure(curator.getZookeeperClient()); // ensure root is created + getCache().start(); + } + + public PathChildrenCache getCache() { + return cache; + } + + public String getRoot() { + return root; + } + + public CreateMode getMode() { + return mode; + } + + /** + * Returns true if path exists in the cache, false otherwise. + * + * Note that calls to this method are eventually consistent. + * + * @param path path to check + */ + public boolean hasPath(final String path) { + return hasPath(path, false); + } + + /** + * Checks if the given path exists. + * + * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise, + * the check is eventually consistent. + * + * @param path path to check + * @param consistent whether the check should be consistent + * @return + */ + public boolean hasPath(final String path, final boolean consistent) { + Preconditions.checkNotNull(path, "path is required"); + + final String target = PathUtils.join(root, path); + try { + if (consistent) { + return curator.checkExists().forPath(target) != null; + } else { + return getCache().getCurrentData(target) != null; + } + } catch (final Exception e) { + throw new DrillRuntimeException("error while checking path on zookeeper", e); + } + } + + /** + * Returns a value corresponding to the given path if path exists in the cache, null otherwise. + * + * Note that calls to this method are eventually consistent. + * + * @param path target path + */ + public byte[] get(final String path) { + return get(path, false); + } + + /** + * Returns the value corresponding to the given key, null otherwise. + * + * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise, + * the check is eventually consistent. + * + * @param path target path + */ + public byte[] get(final String path, final boolean consistent) { + Preconditions.checkNotNull(path, "path is required"); + + final String target = PathUtils.join(root, path); + if (consistent) { + try { + return curator.getData().forPath(target); + } catch (final Exception ex) { + throw new DrillRuntimeException(String.format("error retrieving value for [%s]", path), ex); + } + } else { + final ChildData data = getCache().getCurrentData(target); + if (data != null) { + return data.getData(); + } + } + return null; + } + + /** + * Creates the given path without placing any data in. + * + * @param path target path + */ + public void create(final String path) { + Preconditions.checkNotNull(path, "path is required"); + + final String target = PathUtils.join(root, path); + try { + curator.create().withMode(mode).forPath(target); + getCache().rebuildNode(target); + } catch (final Exception e) { + throw new DrillRuntimeException("unable to put ", e); + } + } + + /** + * Puts the given byte sequence into the given path. + * + * If path does not exists, this call creates it. + * + * @param path target path + * @param data data to store + */ + public void put(final String path, final byte[] data) { + Preconditions.checkNotNull(path, "path is required"); + Preconditions.checkNotNull(data, "data is required"); + + final String target = PathUtils.join(root, path); + try { + // we make a consistent read to ensure this call won't fail upon consecutive calls on the same path + // before cache is updated + if (hasPath(path, true)) { + curator.setData().forPath(target, data); + } else { + curator.create().withMode(mode).forPath(target, data); + } + getCache().rebuildNode(target); + + } catch (final Exception e) { + throw new DrillRuntimeException("unable to put ", e); + } + } + + /** + * Deletes the given node residing at the given path + * + * @param path target path to delete + */ + public void delete(final String path) { + Preconditions.checkNotNull(path, "path is required"); + + final String target = PathUtils.join(root, path); + try { + curator.delete().forPath(target); + getCache().rebuildNode(target); + } catch (final Exception e) { + throw new DrillRuntimeException(String.format("unable to delete node at %s", target), e); + } + } + + /** + * Returns an iterator of (key, value) pairs residing under {@link #getRoot() root} path. + */ + public Iterator<Map.Entry<String, byte[]>> entries() { + final String prefix = PathUtils.join(root, "/"); + return Iterables.transform(getCache().getCurrentData(), new Function<ChildData, Map.Entry<String, byte[]>>() { + @Nullable + @Override + public Map.Entry<String, byte[]> apply(final ChildData data) { + // normalize key name removing the root prefix. resultant key must be a relative path, not beginning with a '/'. + final String key = data.getPath().replace(prefix, ""); + return new ImmutableEntry<>(key, data.getData()); + } + }).iterator(); + } + + @Override + public void close() throws Exception { + getCache().close(); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java new file mode 100644 index 0000000..506d485 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/StoreException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.exception; + +import org.apache.drill.common.exceptions.DrillException; + +public class StoreException extends DrillException { + public StoreException() { + super(); + } + + public StoreException(Throwable cause) { + super(cause); + } + + public StoreException(String message) { + super(message); + } + + public StoreException(String message, Throwable cause) { + super(message, cause); + } + + public StoreException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java new file mode 100644 index 0000000..f44d835 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/InstanceSerializer.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.serialization; + +import java.io.IOException; + +public interface InstanceSerializer<T> { + byte[] serialize(T instance) throws IOException; + T deserialize(byte[] raw) throws IOException; +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java new file mode 100644 index 0000000..676929d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/JacksonSerializer.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.serialization; + +import java.io.IOException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.base.Objects; + +public class JacksonSerializer<T> implements InstanceSerializer<T> { + private final ObjectReader reader; + private final ObjectWriter writer; + + public JacksonSerializer(final ObjectMapper mapper, final Class<T> klazz) { + this.reader = mapper.readerFor(klazz); + this.writer = mapper.writer(); + } + + @Override + public T deserialize(final byte[] raw) throws IOException { + return reader.readValue(raw); + } + + @Override + public byte[] serialize(final T instance) throws IOException { + return writer.writeValueAsBytes(instance); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof JacksonSerializer && obj.getClass().equals(getClass())) { + final JacksonSerializer<T> other = (JacksonSerializer<T>)obj; + return Objects.equal(reader, other.reader) && Objects.equal(writer, other.writer); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(reader, writer); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java new file mode 100644 index 0000000..e3ee5f6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/ProtoSerializer.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.serialization; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import com.dyuproject.protostuff.JsonIOUtil; +import com.dyuproject.protostuff.Schema; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; + +public class ProtoSerializer<T, B extends Message.Builder> implements InstanceSerializer<T> { + private final Schema<B> readSchema; + private final Schema<T> writeSchema; + + public ProtoSerializer(final Schema<B> readSchema, final Schema<T> writeSchema) { + this.readSchema = Preconditions.checkNotNull(readSchema); + this.writeSchema = Preconditions.checkNotNull(writeSchema); + } + + @Override + public T deserialize(final byte[] raw) throws IOException { + final B builder = readSchema.newMessage(); + JsonIOUtil.mergeFrom(raw, builder, readSchema, false); + return (T)builder.build(); + } + + @Override + public byte[] serialize(final T instance) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + JsonIOUtil.writeTo(out, instance, writeSchema, false); + return out.toByteArray(); + } + + @Override + public int hashCode() { + return Objects.hashCode(readSchema, writeSchema); + } + + @Override + public boolean equals(final Object obj) { + if (obj instanceof ProtoSerializer && obj.getClass().equals(getClass())) { + final ProtoSerializer<T, B> other = (ProtoSerializer<T, B>)obj; + return Objects.equal(readSchema, other.readSchema) && Objects.equal(writeSchema, other.writeSchema); + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index c781493..441fa91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -37,10 +37,10 @@ import org.apache.drill.exec.server.options.OptionValue.OptionType; import org.apache.drill.exec.server.rest.WebServer; import org.apache.drill.exec.service.ServiceEngine; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.sys.CachingStoreProvider; -import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.drill.exec.store.sys.PStoreRegistry; -import org.apache.drill.exec.store.sys.local.LocalPStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.apache.drill.exec.work.WorkManager; import org.apache.zookeeper.Environment; @@ -63,7 +63,7 @@ public class Drillbit implements AutoCloseable { private final ClusterCoordinator coord; private final ServiceEngine engine; - private final PStoreProvider storeProvider; + private final PersistentStoreProvider storeProvider; private final WorkManager manager; private final BootStrapContext context; private final WebServer webServer; @@ -93,10 +93,10 @@ public class Drillbit implements AutoCloseable { if (serviceSet != null) { coord = serviceSet.getCoordinator(); - storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config)); + storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config)); } else { coord = new ZKClusterCoordinator(config); - storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider(); + storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider(); } logger.info("Construction completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index aa6a0da..1af6d11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -39,11 +39,11 @@ import org.apache.drill.exec.rpc.data.DataConnectionCreator; import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.store.SchemaFactory; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; import com.codahale.metrics.MetricRegistry; -public class DrillbitContext { +public class DrillbitContext implements AutoCloseable { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class); private final BootStrapContext context; @@ -57,7 +57,7 @@ public class DrillbitContext { private final WorkEventBus workBus; private final FunctionImplementationRegistry functionRegistry; private final SystemOptionManager systemOptions; - private final PStoreProvider provider; + private final PersistentStoreProvider provider; private final CodeCompiler compiler; private final ScanResult classpathScan; private final LogicalPlanPersistence lpPersistence; @@ -70,7 +70,7 @@ public class DrillbitContext { Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, - PStoreProvider provider) { + PersistentStoreProvider provider) { this.classpathScan = context.getClasspathScan(); this.workBus = workBus; this.controller = checkNotNull(controller); @@ -152,7 +152,7 @@ public class DrillbitContext { return reader; } - public PStoreProvider getPersistentStoreProvider() { + public PersistentStoreProvider getStoreProvider() { return provider; } @@ -180,4 +180,8 @@ public class DrillbitContext { return classpathScan; } + @Override + public void close() throws Exception { + getOptionManager().close(); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java index 72eb306..06bb686 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/RemoteServiceSet.java @@ -17,15 +17,12 @@ */ package org.apache.drill.exec.server; -import java.io.Closeable; -import java.io.IOException; - import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.local.LocalClusterCoordinator; import org.apache.drill.exec.memory.BufferAllocator; -public class RemoteServiceSet implements Closeable { +public class RemoteServiceSet implements AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteServiceSet.class); private final ClusterCoordinator coordinator; @@ -41,7 +38,7 @@ public class RemoteServiceSet implements Closeable { } @Override - public void close() throws IOException { + public void close() throws Exception { coordinator.close(); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java index a2b2e93..8753a51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionValue.java @@ -23,11 +23,12 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.drill.exec.store.sys.PersistentStore; /** * An {@link OptionValue option value} is used by an {@link OptionManager} to store a run-time setting. This setting, * for example, could affect a query in execution stage. Instances of this class are JSON serializable and can be stored - * in a {@link org.apache.drill.exec.store.sys.PStore persistent store} (see {@link SystemOptionManager#options}), or + * in a {@link PersistentStore persistent store} (see {@link SystemOptionManager#options}), or * in memory (see {@link InMemoryOptionManager#options}). */ @JsonInclude(Include.NON_NULL) http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index e54b914..8b14076 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.collections.IteratorUtils; import org.apache.drill.common.config.LogicalPlanPersistence; @@ -34,9 +35,9 @@ import org.apache.drill.exec.compile.ClassTransformer; import org.apache.drill.exec.compile.QueryClassLoader; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.server.options.OptionValue.OptionType; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.util.AssertionUtil; import static com.google.common.base.Preconditions.checkArgument; @@ -46,7 +47,7 @@ import static com.google.common.base.Preconditions.checkArgument; * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and * persist between restarts. */ -public class SystemOptionManager extends BaseOptionManager { +public class SystemOptionManager extends BaseOptionManager implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemOptionManager.class); private static final CaseInsensitiveMap<OptionValidator> VALIDATORS; @@ -143,19 +144,19 @@ public class SystemOptionManager extends BaseOptionManager { VALIDATORS = CaseInsensitiveMap.newImmutableMap(tmp); } - private final PStoreConfig<OptionValue> config; + private final PersistentStoreConfig<OptionValue> config; - private final PStoreProvider provider; + private final PersistentStoreProvider provider; /** * Persistent store for options that have been changed from default. * NOTE: CRUD operations must use lowercase keys. */ - private PStore<OptionValue> options; + private PersistentStore<OptionValue> options; - public SystemOptionManager(LogicalPlanPersistence lpPersistence, final PStoreProvider provider) { + public SystemOptionManager(LogicalPlanPersistence lpPersistence, final PersistentStoreProvider provider) { this.provider = provider; - this.config = PStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class) + this.config = PersistentStoreConfig.newJacksonBuilder(lpPersistence.getMapper(), OptionValue.class) .name("sys.options") .build(); } @@ -166,10 +167,10 @@ public class SystemOptionManager extends BaseOptionManager { * @return this option manager * @throws IOException */ - public SystemOptionManager init() throws IOException { - options = provider.getStore(config); + public SystemOptionManager init() throws Exception { + options = provider.getOrCreateStore(config); // if necessary, deprecate and replace options from persistent store - for (final Entry<String, OptionValue> option : options) { + for (final Entry<String, OptionValue> option : Lists.newArrayList(options.getAll())) { final String name = option.getKey(); final OptionValidator validator = VALIDATORS.get(name); if (validator == null) { @@ -215,7 +216,7 @@ public class SystemOptionManager extends BaseOptionManager { buildList.put(entry.getKey(), entry.getValue().getDefault()); } // override if changed - for (final Map.Entry<String, OptionValue> entry : options) { + for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) { buildList.put(entry.getKey(), entry.getValue()); } return buildList.values().iterator(); @@ -260,7 +261,7 @@ public class SystemOptionManager extends BaseOptionManager { public void deleteAllOptions(OptionType type) { checkArgument(type == OptionType.SYSTEM, "OptionType must be SYSTEM."); final Set<String> names = Sets.newHashSet(); - for (final Map.Entry<String, OptionValue> entry : options) { + for (final Map.Entry<String, OptionValue> entry : Lists.newArrayList(options.getAll())) { names.add(entry.getKey()); } for (final String name : names) { @@ -272,4 +273,9 @@ public class SystemOptionManager extends BaseOptionManager { public OptionList getOptionList() { return (OptionList) IteratorUtils.toList(iterator()); } + + @Override + public void close() throws Exception { + options.close(); + } } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 300c617..d8533b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -22,7 +22,7 @@ import org.apache.drill.exec.server.rest.auth.AuthDynamicFeature; import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal; import org.apache.drill.exec.server.rest.profile.ProfileResources; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.WorkManager; import org.glassfish.hk2.api.Factory; import org.glassfish.hk2.utilities.binding.AbstractBinder; @@ -80,7 +80,7 @@ public class DrillRestServer extends ResourceConfig { protected void configure() { bind(workManager).to(WorkManager.class); bind(workManager.getContext().getLpPersistence().getMapper()).to(ObjectMapper.class); - bind(workManager.getContext().getPersistentStoreProvider()).to(PStoreProvider.class); + bind(workManager.getContext().getStoreProvider()).to(PersistentStoreProvider.class); bind(workManager.getContext().getStorage()).to(StoragePluginRegistry.class); bindFactory(DrillUserPrincipalProvider.class).to(DrillUserPrincipal.class); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java index 2af9cac..3266eda 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java @@ -73,7 +73,7 @@ public class StorageResources { public List<PluginConfigWrapper> getStoragePluginsJSON() { List<PluginConfigWrapper> list = Lists.newArrayList(); - for (Map.Entry<String, StoragePluginConfig> entry : storage.getStore()) { + for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(storage.getStore().getAll())) { PluginConfigWrapper plugin = new PluginConfigWrapper(entry.getKey(), entry.getValue()); list.add(plugin); } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index 0c04c9e..ddc9da1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.server.rest.profile; -import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collections; @@ -36,7 +35,10 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.SecurityContext; import javax.xml.bind.annotation.XmlRootElement; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.store.TransientStore; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryInfo; @@ -44,8 +46,8 @@ import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.server.rest.ViewableWithPermissions; import org.apache.drill.exec.server.rest.auth.DrillUserPrincipal; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.work.WorkManager; import org.apache.drill.exec.work.foreman.Foreman; import org.apache.drill.exec.work.foreman.QueryManager; @@ -58,6 +60,8 @@ import com.google.common.collect.Lists; public class ProfileResources { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProfileResources.class); + public final static int MAX_PROFILES = 100; + @Inject WorkManager work; @Inject DrillUserPrincipal principal; @Inject SecurityContext sc; @@ -119,8 +123,12 @@ public class ProfileResources { } - private PStoreProvider provider(){ - return work.getContext().getPersistentStoreProvider(); + protected PersistentStoreProvider getProvider() { + return work.getContext().getStoreProvider(); + } + + protected ClusterCoordinator getCoordinator() { + return work.getContext().getClusterCoordinator(); } @XmlRootElement @@ -146,38 +154,37 @@ public class ProfileResources { @Path("/profiles.json") @Produces(MediaType.APPLICATION_JSON) public QProfiles getProfilesJSON() { - PStore<QueryProfile> completed = null; - PStore<QueryInfo> running = null; try { - completed = provider().getStore(QueryManager.QUERY_PROFILE); - running = provider().getStore(QueryManager.RUNNING_QUERY_INFO); - } catch (IOException e) { - logger.debug("Failed to get profiles from persistent or ephemeral store."); - return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>()); - } + final PersistentStore<QueryProfile> completed = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE); + final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO); - List<ProfileInfo> runningQueries = Lists.newArrayList(); + final List<ProfileInfo> runningQueries = Lists.newArrayList(); - for (Map.Entry<String, QueryInfo> entry : running) { - QueryInfo profile = entry.getValue(); - if (principal.canManageProfileOf(profile.getUser())) { - runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), - profile.getQuery(), profile.getState().name(), profile.getUser())); + for (final Map.Entry<String, QueryInfo> entry: Lists.newArrayList(running.entries())) { + final QueryInfo profile = entry.getValue(); + if (principal.canManageProfileOf(profile.getUser())) { + runningQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), + profile.getQuery(), profile.getState().name(), profile.getUser())); + } } - } - Collections.sort(runningQueries, Collections.reverseOrder()); + Collections.sort(runningQueries, Collections.reverseOrder()); - List<ProfileInfo> finishedQueries = Lists.newArrayList(); - for (Map.Entry<String, QueryProfile> entry : completed) { - QueryProfile profile = entry.getValue(); - if (principal.canManageProfileOf(profile.getUser())) { - finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), - profile.getQuery(), profile.getState().name(), profile.getUser())); + List<ProfileInfo> finishedQueries = Lists.newArrayList(); + for (Map.Entry<String, QueryProfile> entry : Lists.newArrayList(completed.getRange(0, MAX_PROFILES))) { + QueryProfile profile = entry.getValue(); + if (principal.canManageProfileOf(profile.getUser())) { + finishedQueries.add(new ProfileInfo(entry.getKey(), profile.getStart(), profile.getForeman().getAddress(), + profile.getQuery(), profile.getState().name(), profile.getUser())); + } } + + return new QProfiles(runningQueries, finishedQueries); + } catch (Exception e) { + logger.debug("Failed to get profiles from persistent or ephemeral store."); + return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>()); } - return new QProfiles(runningQueries, finishedQueries); } @GET @@ -188,7 +195,7 @@ public class ProfileResources { return ViewableWithPermissions.create("/rest/profile/list.ftl", sc, profiles); } - private QueryProfile getQueryProfile(String queryId) throws IOException { + private QueryProfile getQueryProfile(String queryId) { QueryId id = QueryIdHelper.getQueryIdFromString(queryId); // first check local running @@ -200,9 +207,9 @@ public class ProfileResources { } // then check remote running - try{ - PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO); - QueryInfo info = runningQueries.get(queryId); + try { + final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO); + final QueryInfo info = running.get(queryId); if (info != null) { QueryProfile queryProfile = work.getContext() .getController() @@ -217,11 +224,15 @@ public class ProfileResources { } // then check blob store - PStore<QueryProfile> profiles = provider().getStore(QueryManager.QUERY_PROFILE); - QueryProfile queryProfile = profiles.get(queryId); - if (queryProfile != null) { - checkOrThrowProfileViewAuthorization(queryProfile); - return queryProfile; + try { + final PersistentStore<QueryProfile> profiles = getProvider().getOrCreateStore(QueryManager.QUERY_PROFILE); + final QueryProfile queryProfile = profiles.get(queryId); + if (queryProfile != null) { + checkOrThrowProfileViewAuthorization(queryProfile); + return queryProfile; + } + } catch (final Exception e) { + throw new DrillRuntimeException("error while retrieving profile", e); } throw UserException.validationError() @@ -236,7 +247,7 @@ public class ProfileResources { public String getProfileJSON(@PathParam("queryid") String queryId) { try { return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId))); - } catch (IOException e) { + } catch (Exception e) { logger.debug("Failed to serialize profile for: " + queryId); return ("{ 'message' : 'error (unable to serialize profile)' }"); } @@ -245,7 +256,7 @@ public class ProfileResources { @GET @Path("/profiles/{queryid}") @Produces(MediaType.TEXT_HTML) - public Viewable getProfile(@PathParam("queryid") String queryId) throws IOException { + public Viewable getProfile(@PathParam("queryid") String queryId){ ProfileWrapper wrapper = new ProfileWrapper(getQueryProfile(queryId)); return ViewableWithPermissions.create("/rest/profile/profile.ftl", sc, wrapper); } @@ -254,7 +265,7 @@ public class ProfileResources { @GET @Path("/profiles/cancel/{queryid}") @Produces(MediaType.TEXT_PLAIN) - public String cancelQuery(@PathParam("queryid") String queryId) throws IOException { + public String cancelQuery(@PathParam("queryid") String queryId) { QueryId id = QueryIdHelper.getQueryIdFromString(queryId); @@ -267,9 +278,9 @@ public class ProfileResources { } // then check remote running - try{ - PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO); - QueryInfo info = runningQueries.get(queryId); + try { + final TransientStore<QueryInfo> running = getCoordinator().getOrCreateTransientStore(QueryManager.RUNNING_QUERY_INFO); + final QueryInfo info = running.get(queryId); checkOrThrowQueryCancelAuthorization(info.getUser(), queryId); Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS); if(a.getOk()){ http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index c7d364b..b6eed2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -26,7 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.store.dfs.FormatPlugin; -import org.apache.drill.exec.store.sys.PStore; +import org.apache.drill.exec.store.sys.PersistentStore; public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable { final String SYS_PLUGIN = "sys"; @@ -104,7 +104,7 @@ public interface StoragePluginRegistry extends Iterable<Map.Entry<String, Storag * Get the PStore for this StoragePluginRegistry. (Used in the management layer.) * @return PStore for StoragePlugin configuration objects. */ - PStore<StoragePluginConfig> getStore(); + PersistentStore<StoragePluginConfig> getStore(); /** * Return StoragePlugin rule sets. http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java index fefa183..e680502 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java @@ -46,6 +46,7 @@ import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.planner.logical.DrillRuleSets; import org.apache.drill.exec.planner.logical.StoragePlugins; @@ -54,8 +55,8 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.ischema.InfoSchemaConfig; import org.apache.drill.exec.store.ischema.InfoSchemaStoragePlugin; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.SystemTablePlugin; import org.apache.drill.exec.store.sys.SystemTablePluginConfig; @@ -82,7 +83,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { private DrillbitContext context; private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory(); - private final PStore<StoragePluginConfig> pluginSystemTable; + private final PersistentStore<StoragePluginConfig> pluginSystemTable; private final LogicalPlanPersistence lpPersistence; private final ScanResult classpathScan; private final LoadingCache<StoragePluginConfig, StoragePlugin> ephemeralPlugins; @@ -93,12 +94,12 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { this.classpathScan = checkNotNull(context.getClasspathScan()); try { this.pluginSystemTable = context // - .getPersistentStoreProvider() // - .getStore(PStoreConfig // + .getStoreProvider() // + .getOrCreateStore(PersistentStoreConfig // .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) // .name(PSTORE_NAME) // .build()); - } catch (IOException | RuntimeException e) { + } catch (StoreException | RuntimeException e) { logger.error("Failure while loading storage plugin registry.", e); throw new RuntimeException("Failure while reading and loading storage plugin configuration.", e); } @@ -120,7 +121,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { }); } - public PStore<StoragePluginConfig> getStore() { + public PersistentStore<StoragePluginConfig> getStore() { return pluginSystemTable; } @@ -137,7 +138,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into * the system table. */ - if (!pluginSystemTable.iterator().hasNext()) { + if (!pluginSystemTable.getAll().hasNext()) { // bootstrap load the config since no plugins are stored. logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration."); Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false); @@ -162,7 +163,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { } Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>(); - for (Map.Entry<String, StoragePluginConfig> entry : pluginSystemTable) { + for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(pluginSystemTable.getAll())) { String name = entry.getKey(); StoragePluginConfig config = entry.getValue(); if (config.isEnabled()) { @@ -385,7 +386,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { Set<String> currentPluginNames = Sets.newHashSet(plugins.names()); // iterate through the plugin instances in the persistence store adding // any new ones and refreshing those whose configuration has changed - for (Map.Entry<String, StoragePluginConfig> config : pluginSystemTable) { + for (Map.Entry<String, StoragePluginConfig> config : Lists.newArrayList(pluginSystemTable.getAll())) { if (config.getValue().isEnabled()) { getPlugin(config.getKey()); currentPluginNames.remove(config.getKey()); @@ -460,6 +461,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { public synchronized void close() throws Exception { ephemeralPlugins.invalidateAll(); plugins.close(); + pluginSystemTable.close(); } /** http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java new file mode 100644 index 0000000..248c3cb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import java.util.Iterator; +import java.util.Map; + +import org.apache.drill.common.collections.ImmutableEntry; + +public abstract class BasePersistentStore<V> implements PersistentStore<V> { + + @Override + public Iterator<Map.Entry<String, V>> getAll() { + return getRange(0, Integer.MAX_VALUE); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java deleted file mode 100644 index 68440cb..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/CachingStoreProvider.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import java.io.IOException; -import java.util.concurrent.ConcurrentMap; - -import org.apache.drill.exec.store.sys.PStoreConfig.Mode; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -public class CachingStoreProvider implements PStoreProvider, AutoCloseable { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingStoreProvider.class); - - private final ConcurrentMap<PStoreConfig<?>, PStore<?>> storeCache = Maps.newConcurrentMap(); - private final PStoreProvider provider; - - public CachingStoreProvider(PStoreProvider provider) { - super(); - this.provider = provider; - } - - @SuppressWarnings("unchecked") - public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException { - PStore<?> s = storeCache.get(config); - if(s == null){ - PStore<?> newStore = provider.getStore(config); - s = storeCache.putIfAbsent(config, newStore); - if(s == null){ - s = newStore; - }else{ - newStore.close(); - } - } - - return (PStore<V>) s; - - } - - @Override - public void start() throws IOException { - provider.start(); - } - - @Override - public void close() throws Exception { - for(PStore<?> store : storeCache.values()){ - store.close(); - } - storeCache.clear(); - provider.close(); - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java deleted file mode 100644 index 2d04957..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStore.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.store.sys; - - -/** - * Interfaces to define EStore, which is keep track of status/information for running queries. The information - * would be gone, if the query is completed, or the foreman drillbit is not responding. - * @param <V> - */ -public interface EStore <V> extends PStore<V> { -}
