http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java deleted file mode 100644 index 4c79a28..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/EStoreProvider.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.store.sys; - - -/** - * Interface to define the provider which return EStore. - */ - -public interface EStoreProvider extends PStoreProvider { -}
http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java index 85a1c3c..5c8a641 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/OptionIterator.java @@ -21,15 +21,14 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.server.options.DrillConfigIterator; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValue; import org.apache.drill.exec.server.options.OptionValue.Kind; import org.apache.drill.exec.server.options.OptionValue.OptionType; - -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import org.apache.drill.exec.server.options.SystemOptionManager; public class OptionIterator implements Iterator<Object> { http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java deleted file mode 100644 index b629645..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStore.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import java.util.Map; - - -/** - * Interface for reading and writing values to a persistent storage provider. Iterators are guaranteed to be returned in key order. - * @param <V> - */ -public interface PStore<V> extends Iterable<Map.Entry<String, V>> { - public V get(String key); - public void put(String key, V value); - public boolean putIfAbsent(String key, V value); - public void delete(String key); - public void close(); -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java deleted file mode 100644 index bd9d977..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreConfig.java +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import org.apache.drill.exec.store.sys.serialize.JacksonSerializer; -import org.apache.drill.exec.store.sys.serialize.PClassSerializer; -import org.apache.drill.exec.store.sys.serialize.ProtoSerializer; - -import com.dyuproject.protostuff.Schema; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Preconditions; -import com.google.protobuf.Message; -import com.google.protobuf.Message.Builder; - -public class PStoreConfig<V> { - - private final String name; - private final PClassSerializer<V> valueSerializer; - private final Mode mode; - private final int maxIteratorSize; - - public static enum Mode {PERSISTENT, EPHEMERAL, BLOB_PERSISTENT}; - - private PStoreConfig(String name, PClassSerializer<V> valueSerializer, Mode mode, int maxIteratorSize) { - super(); - this.name = name; - this.valueSerializer = valueSerializer; - this.mode = mode; - this.maxIteratorSize = Math.abs(maxIteratorSize); - } - - public Mode getMode() { - return mode; - } - - public int getMaxIteratorSize() { - return maxIteratorSize; - } - - public String getName() { - return name; - } - - public PClassSerializer<V> getSerializer() { - return valueSerializer; - } - - public static <V extends Message, X extends Builder> PStoreConfigBuilder<V> newProtoBuilder(Schema<V> writeSchema, Schema<X> readSchema) { - return new PStoreConfigBuilder<V>(new ProtoSerializer<V, X>(writeSchema, readSchema)); - } - - public static <V> PStoreConfigBuilder<V> newJacksonBuilder(ObjectMapper mapper, Class<V> clazz) { - return new PStoreConfigBuilder<V>(new JacksonSerializer<V>(mapper, clazz)); - } - - public static class PStoreConfigBuilder<V> { - String name; - PClassSerializer<V> serializer; - Mode mode = Mode.PERSISTENT; - int maxIteratorSize = Integer.MAX_VALUE; - - PStoreConfigBuilder(PClassSerializer<V> serializer) { - super(); - this.serializer = serializer; - } - - public PStoreConfigBuilder<V> name(String name) { - this.name = name; - return this; - } - - public PStoreConfigBuilder<V> persist(){ - this.mode = Mode.PERSISTENT; - return this; - } - - public PStoreConfigBuilder<V> ephemeral(){ - this.mode = Mode.EPHEMERAL; - return this; - } - - public PStoreConfigBuilder<V> blob(){ - this.mode = Mode.BLOB_PERSISTENT; - return this; - } - - /** - * Set the maximum size of the iterator. Positive numbers start from the start of the list. Negative numbers start from the end of the list. - * @param size - * @return - */ - public PStoreConfigBuilder<V> max(int size){ - this.maxIteratorSize = size; - return this; - } - - public PStoreConfig<V> build(){ - Preconditions.checkNotNull(name); - return new PStoreConfig<V>(name, serializer, mode, maxIteratorSize); - } - - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + maxIteratorSize; - result = prime * result + ((mode == null) ? 0 : mode.hashCode()); - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + ((valueSerializer == null) ? 0 : valueSerializer.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - PStoreConfig other = (PStoreConfig) obj; - if (maxIteratorSize != other.maxIteratorSize) { - return false; - } - if (mode != other.mode) { - return false; - } - if (name == null) { - if (other.name != null) { - return false; - } - } else if (!name.equals(other.name)) { - return false; - } - if (valueSerializer == null) { - if (other.valueSerializer != null) { - return false; - } - } else if (!valueSerializer.equals(other.valueSerializer)) { - return false; - } - return true; - } - - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java deleted file mode 100644 index efa223e..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreProvider.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import java.io.IOException; - -public interface PStoreProvider extends AutoCloseable { - public <V> PStore<V> getStore(PStoreConfig<V> config) throws IOException; - public void start() throws IOException; -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java deleted file mode 100644 index 532e6be..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PStoreRegistry.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys; - -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.coord.ClusterCoordinator; - -import com.typesafe.config.ConfigException; - -public class PStoreRegistry { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PStoreRegistry.class); - - private DrillConfig config; - private ClusterCoordinator coord; - - public PStoreRegistry(ClusterCoordinator coord, DrillConfig config) { - this.coord = coord; - this.config = config; - } - - public ClusterCoordinator getClusterCoordinator() { - return this.coord; - } - - public DrillConfig getConfig() { - return this.config; - } - - @SuppressWarnings("unchecked") - public PStoreProvider newPStoreProvider() throws ExecutionSetupException { - try { - String storeProviderClassName = config.getString(ExecConstants.SYS_STORE_PROVIDER_CLASS); - logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName); - Class<? extends PStoreProvider> storeProviderClass = (Class<? extends PStoreProvider>) Class.forName(storeProviderClassName); - Constructor<? extends PStoreProvider> c = storeProviderClass.getConstructor(PStoreRegistry.class); - return new CachingStoreProvider(c.newInstance(this)); - } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException - | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { - logger.error(e.getMessage(), e); - throw new ExecutionSetupException("A System Table provider was either not specified or could not be found or instantiated", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java new file mode 100644 index 0000000..767b1d5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import java.util.Iterator; +import java.util.Map; + +/** + * An abstraction used to store and retrieve instances of given value type. + * + * @param <V> value type + */ +public interface PersistentStore<V> extends AutoCloseable { + /** + * Returns storage {@link PersistentStoreMode mode} of this store. + */ + PersistentStoreMode getMode(); + + /** + * Returns the value for the given key if exists, null otherwise. + * @param key lookup key + */ + V get(String key); + + /** + * Stores the (key, value) tuple in the store. Lifetime of the tuple depends upon store {@link #getMode mode}. + * + * @param key lookup key + * @param value value to store + */ + void put(String key, V value); + + + /** + * Removes the value corresponding to the given key if exists, nothing happens otherwise. + * @param key lookup key + */ + void delete(String key); + + /** + * Stores the (key, value) tuple in the store only if it does not exists. + * + * @param key lookup key + * @param value value to store + * @return true if put takes place, false otherwise. + */ + boolean putIfAbsent(String key, V value); + + /** + * Returns an iterator of desired number of entries offsetting by the skip value. + * + * @param skip number of records to skip from beginning + * @param take max number of records to return + */ + Iterator<Map.Entry<String, V>> getRange(int skip, int take); + + /** + * Returns an iterator of entries. + */ + Iterator<Map.Entry<String, V>> getAll(); + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java new file mode 100644 index 0000000..ca319f2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreConfig.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import com.dyuproject.protostuff.Schema; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; +import com.google.protobuf.Message; +import com.google.protobuf.Message.Builder; +import org.apache.drill.exec.serialization.InstanceSerializer; +import org.apache.drill.exec.serialization.JacksonSerializer; +import org.apache.drill.exec.serialization.ProtoSerializer; + + +/** + * An abstraction for configurations that are used to create a {@link PersistentStore store}. + * + * @param <V> value type of which {@link PersistentStore} uses to store & retrieve instances + */ +public class PersistentStoreConfig<V> { + + private final String name; + private final InstanceSerializer<V> valueSerializer; + private final PersistentStoreMode mode; + + protected PersistentStoreConfig(String name, InstanceSerializer<V> valueSerializer, PersistentStoreMode mode) { + this.name = name; + this.valueSerializer = valueSerializer; + this.mode = mode; + } + + public PersistentStoreMode getMode() { + return mode; + } + + public String getName() { + return name; + } + + public InstanceSerializer<V> getSerializer() { + return valueSerializer; + } + + @Override + public int hashCode() { + return Objects.hashCode(name, valueSerializer, mode); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PersistentStoreConfig) { + final PersistentStoreConfig other = PersistentStoreConfig.class.cast(obj); + return Objects.equal(name, other.name) + && Objects.equal(valueSerializer, other.valueSerializer) + && Objects.equal(mode, other.mode); + } + return false; + } + + public static <V extends Message, X extends Builder> StoreConfigBuilder<V> newProtoBuilder(Schema<V> writeSchema, Schema<X> readSchema) { + return new StoreConfigBuilder<>(new ProtoSerializer<>(readSchema, writeSchema)); + } + + public static <V> StoreConfigBuilder<V> newJacksonBuilder(ObjectMapper mapper, Class<V> clazz) { + return new StoreConfigBuilder<>(new JacksonSerializer<>(mapper, clazz)); + } + + public static class StoreConfigBuilder<V> { + private String name; + private InstanceSerializer<V> serializer; + private PersistentStoreMode mode = PersistentStoreMode.PERSISTENT; + + protected StoreConfigBuilder(InstanceSerializer<V> serializer) { + super(); + this.serializer = serializer; + } + + public StoreConfigBuilder<V> name(String name) { + this.name = name; + return this; + } + + public StoreConfigBuilder<V> persist(){ + this.mode = PersistentStoreMode.PERSISTENT; + return this; + } + + public StoreConfigBuilder<V> blob(){ + this.mode = PersistentStoreMode.BLOB_PERSISTENT; + return this; + } + + public PersistentStoreConfig<V> build(){ + Preconditions.checkNotNull(name); + return new PersistentStoreConfig<>(name, serializer, mode); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java new file mode 100644 index 0000000..68d0cb6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreMode.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +/** + * Defines operation mode of a {@link PersistentStore} instance. + */ +public enum PersistentStoreMode { + PERSISTENT, + BLOB_PERSISTENT +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java new file mode 100644 index 0000000..75b89b4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import org.apache.drill.exec.exception.StoreException; + +/** + * A factory used to create {@link PersistentStore store} instances. + * + */ +public interface PersistentStoreProvider extends AutoCloseable { + /** + * Gets or creates a {@link PersistentStore persistent store} for the given configuration. + * + * Note that implementors have liberty to cache previous {@link PersistentStore store} instances. + * + * @param config store configuration + * @param <V> store value type + */ + <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException; + + + /** + * Sets up the provider. + */ + void start() throws Exception; +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java new file mode 100644 index 0000000..b117513 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreRegistry.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; + +import com.google.common.base.Preconditions; +import com.typesafe.config.ConfigException; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; + +public class PersistentStoreRegistry<C extends ClusterCoordinator> { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PersistentStoreRegistry.class); + + private final DrillConfig config; + private final C coordinator; + + public PersistentStoreRegistry(C coordinator, DrillConfig config) { + this.coordinator = Preconditions.checkNotNull(coordinator, "coordinator cannot be null"); + this.config = Preconditions.checkNotNull(config, "config cannot be null"); + } + + public C getCoordinator() { + return this.coordinator; + } + + public DrillConfig getConfig() { + return this.config; + } + + @SuppressWarnings("unchecked") + public PersistentStoreProvider newPStoreProvider() throws ExecutionSetupException { + try { + String storeProviderClassName = config.getString(ExecConstants.SYS_STORE_PROVIDER_CLASS); + logger.info("Using the configured PStoreProvider class: '{}'.", storeProviderClassName); + Class<? extends PersistentStoreProvider> storeProviderClass = (Class<? extends PersistentStoreProvider>) Class.forName(storeProviderClassName); + Constructor<? extends PersistentStoreProvider> c = storeProviderClass.getConstructor(PersistentStoreRegistry.class); + return new CachingPersistentStoreProvider(c.newInstance(this)); + } catch (ConfigException.Missing | ClassNotFoundException | NoSuchMethodException | SecurityException + | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + logger.error(e.getMessage(), e); + throw new ExecutionSetupException("A System Table provider was either not specified or could not be found or instantiated", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java index 8b7225e..0a9b9b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/StaticDrillTable.java @@ -17,12 +17,11 @@ */ package org.apache.drill.exec.store.sys; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.store.RecordDataType; import org.apache.drill.exec.store.StoragePlugin; -import org.apache.drill.exec.store.pojo.PojoDataType; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; /** * A {@link org.apache.drill.exec.planner.logical.DrillTable} with a defined schema http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java index 33f030b..4fb0475 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTablePlugin.java @@ -34,8 +34,8 @@ import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.AbstractStoragePlugin; -import org.apache.drill.exec.store.pojo.PojoDataType; import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.pojo.PojoDataType; /** * A "storage" plugin for system tables. http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java index 8ade25c..2c3bba4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java @@ -21,7 +21,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -36,11 +40,6 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.store.StoragePluginRegistry; -import com.fasterxml.jackson.annotation.JacksonInject; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; - @JsonTypeName("sys") public class SystemTableScan extends AbstractGroupScan implements SubScan { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SystemTableScan.class); http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java index e9bc7ff..681119d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/ThreadsIterator.java @@ -17,13 +17,13 @@ */ package org.apache.drill.exec.store.sys; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; - import java.lang.management.ManagementFactory; import java.lang.management.ThreadMXBean; import java.util.Iterator; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + public class ThreadsIterator implements Iterator<Object> { private boolean beforeFirst = true; http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java index 5620ece..9bfb700 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/VersionIterator.java @@ -19,10 +19,8 @@ package org.apache.drill.exec.store.sys; import java.io.IOException; import java.net.URL; -import java.util.Enumeration; import java.util.Iterator; import java.util.Properties; -import java.util.jar.Manifest; import com.google.common.io.Resources; import org.apache.drill.common.util.DrillVersionInfo; http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java deleted file mode 100644 index ebee7a8..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java +++ /dev/null @@ -1,234 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.local; - -import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map.Entry; - -import org.apache.commons.io.IOUtils; -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -public class FilePStore<V> implements PStore<V> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilePStore.class); - - - private final Path basePath; - private final PStoreConfig<V> config; - private final DrillFileSystem fs; - - public FilePStore(DrillFileSystem fs, Path base, PStoreConfig<V> config) { - super(); - this.basePath = new Path(base, config.getName()); - this.config = config; - this.fs = fs; - - try { - mkdirs(basePath); - } catch (IOException e) { - throw new RuntimeException("Failure setting pstore configuration path."); - } - } - - private void mkdirs(Path path) throws IOException{ - fs.mkdirs(path); - } - - public static Path getLogDir(){ - String drillLogDir = System.getenv("DRILL_LOG_DIR"); - if (drillLogDir == null) { - drillLogDir = "/var/log/drill"; - } - return new Path(new File(drillLogDir).getAbsoluteFile().toURI()); - } - - public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{ - Path blobRoot = root == null ? getLogDir() : root; - Configuration fsConf = new Configuration(); - if(blobRoot.toUri().getScheme() != null){ - fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); - } - - - DrillFileSystem fs = new DrillFileSystem(fsConf); - fs.mkdirs(blobRoot); - return fs; - } - - @Override - public Iterator<Entry<String, V>> iterator() { - try{ - List<FileStatus> f = fs.list(false, basePath); - if (f == null || f.isEmpty()) { - return Collections.emptyIterator(); - } - List<String> files = Lists.newArrayList(); - - for (FileStatus stat : f) { - String s = stat.getPath().getName(); - if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); - } - } - - Collections.sort(files); - files = files.subList(0, Math.min(files.size(), config.getMaxIteratorSize())); - return new Iter(files.iterator()); - - }catch(IOException e){ - throw new RuntimeException(e); - } - } - - private Path makePath(String name) { - Preconditions.checkArgument( - !name.contains("/") && - !name.contains(":") && - !name.contains("..")); - - final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); - // do this to check file name. - return path; - } - - public V get(String key) { - try{ - Path path = makePath(key); - if(!fs.exists(path)){ - return null; - } - }catch(IOException e){ - throw new RuntimeException(e); - } - - final Path path = makePath(key); - try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); - } - } - - public void put(String key, V value) { - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public boolean putIfAbsent(String key, V value) { - try { - Path p = makePath(key); - if (fs.exists(p)) { - return false; - } else { - put(key, value); - return true; - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public void delete(String key) { - try { - fs.delete(makePath(key), false); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private class Iter implements Iterator<Entry<String, V>>{ - - private Iterator<String> keys; - private String current; - - public Iter(Iterator<String> keys) { - super(); - this.keys = keys; - } - - @Override - public boolean hasNext() { - return keys.hasNext(); - } - - @Override - public Entry<String, V> next() { - current = keys.next(); - return new DeferredEntry(current); - } - - @Override - public void remove() { - delete(current); - keys.remove(); - } - - private class DeferredEntry implements Entry<String, V> { - - private String name; - - - public DeferredEntry(String name) { - super(); - this.name = name; - } - - @Override - public String getKey() { - return name; - } - - @Override - public V getValue() { - return get(name); - } - - @Override - public V setValue(V value) { - throw new UnsupportedOperationException(); - } - - } - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java deleted file mode 100644 index e7c2f94..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalEStoreProvider.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.store.sys.local; - -import java.io.IOException; - -import org.apache.drill.exec.store.sys.EStore; -import org.apache.drill.exec.store.sys.EStoreProvider; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreConfig.Mode; - -import com.google.common.base.Preconditions; - -public class LocalEStoreProvider implements EStoreProvider{ - - @Override - public <V> EStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException { - Preconditions.checkArgument(storeConfig.getMode() == Mode.EPHEMERAL, "Estore configurations must be set ephemeral."); - - return new MapEStore<V>(); - } - - @Override - public void start() throws IOException { - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java index 3131290..cda1180 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/LocalPStoreProvider.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p/> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,77 +17,18 @@ */ package org.apache.drill.exec.store.sys.local; -import java.io.IOException; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; -import org.apache.drill.exec.store.sys.PStoreRegistry; -import org.apache.hadoop.fs.Path; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; /** - * A really simple provider that stores data in the local file system, one value per file. + * Kept for possible references to old class name in configuration. + * + * @deprecated will be removed in 1.7 + * use {@link org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider} instead. */ -public class LocalPStoreProvider implements PStoreProvider { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalPStoreProvider.class); - - private final Path path; - private final boolean enableWrite; - private final ConcurrentMap<PStoreConfig<?>, PStore<?>> pstores; - private final LocalEStoreProvider estoreProvider; - private final DrillFileSystem fs; - - public LocalPStoreProvider(DrillConfig config) throws IOException { - this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH)); - this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE); - this.pstores = enableWrite ? null : new ConcurrentHashMap<PStoreConfig<?>, PStore<?>>(); - this.estoreProvider = new LocalEStoreProvider(); - this.fs = FilePStore.getFileSystem(config, path); - } - - public LocalPStoreProvider(PStoreRegistry registry) throws IOException { - this(registry.getConfig()); - } - - @Override - public void close() { +public class LocalPStoreProvider extends LocalPersistentStoreProvider { + public LocalPStoreProvider(PersistentStoreRegistry registry) throws StoreException { + super(registry); } - - @Override - public <V> PStore<V> getStore(PStoreConfig<V> storeConfig) throws IOException { - switch(storeConfig.getMode()){ - case EPHEMERAL: - return estoreProvider.getStore(storeConfig); - case BLOB_PERSISTENT: - case PERSISTENT: - return getPStore(storeConfig); - default: - throw new IllegalStateException(); - } - - } - - private <V> PStore<V> getPStore(PStoreConfig<V> storeConfig) throws IOException { - if (enableWrite) { - return new FilePStore<V>(fs, path, storeConfig); - } else { - PStore<V> p = new NoWriteLocalPStore<V>(); - PStore<?> p2 = pstores.putIfAbsent(storeConfig, p); - if(p2 != null) { - return (PStore<V>) p2; - } - return p; - } - } - - - @Override - public void start() { - } - } http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java deleted file mode 100644 index 96e51e6..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/MapEStore.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.drill.exec.store.sys.local; - -import org.apache.drill.exec.store.sys.EStore; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Implementation of EStore using ConcurrentHashMap. - * @param <V> - */ -public class MapEStore<V> implements EStore<V> { - ConcurrentHashMap<String, V> store = new ConcurrentHashMap<>(); - - @Override - public V get(String key) { - return store.get(key); - } - - @Override - public void put(String key, V value) { - store.put(key, value); - } - - @Override - public void delete(String key) { - store.remove(key); - } - - @Override - public Iterator<Map.Entry<String, V>> iterator() { - return store.entrySet().iterator(); - } - - @Override - public boolean putIfAbsent(String key, V value) { - V out = store.putIfAbsent(key, value); - return out == null; - } - - @Override - public void close() { - } -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java deleted file mode 100644 index c675618..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/NoWriteLocalPStore.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.local; - -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentMap; - -import org.apache.drill.exec.store.sys.PStore; - -import com.google.common.collect.Maps; - -public class NoWriteLocalPStore<V> implements PStore<V>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NoWriteLocalPStore.class); - - private ConcurrentMap<String, V> map = Maps.newConcurrentMap(); - - private ConcurrentMap<String, V> blobMap = Maps.newConcurrentMap(); - - public NoWriteLocalPStore() { - super(); - } - - @Override - public Iterator<Entry<String, V>> iterator() { - return map.entrySet().iterator(); - } - - @Override - public V get(String key) { - return map.get(key); - } - - @Override - public void put(String key, V value) { - map.put(key, value); - } - - @Override - public boolean putIfAbsent(String key, V value) { - return null == map.putIfAbsent(key, value); - } - - @Override - public void delete(String key) { - map.remove(key); - blobMap.remove(key); - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java deleted file mode 100644 index 53452f3..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/JacksonSerializer.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.serialize; - -import java.io.IOException; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; - -public class JacksonSerializer<X> implements PClassSerializer<X> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JacksonSerializer.class); - - private ObjectWriter writer; - private ObjectReader reader; - - public JacksonSerializer(ObjectMapper mapper, Class<X> clazz){ - this.reader = mapper.reader(clazz); - this.writer = mapper.writer(); - } - - @Override - public byte[] serialize(X val) throws IOException { - return writer.writeValueAsBytes(val); - } - - @Override - public X deserialize(byte[] bytes) throws IOException { - return reader.readValue(bytes); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((reader == null) ? 0 : reader.hashCode()); - result = prime * result + ((writer == null) ? 0 : writer.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - JacksonSerializer other = (JacksonSerializer) obj; - if (reader == null) { - if (other.reader != null) { - return false; - } - } else if (!reader.equals(other.reader)) { - return false; - } - if (writer == null) { - if (other.writer != null) { - return false; - } - } else if (!writer.equals(other.writer)) { - return false; - } - return true; - } - - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java deleted file mode 100644 index a3b0c1b..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/PClassSerializer.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.serialize; - -import java.io.IOException; - -public interface PClassSerializer<X> { - public byte[] serialize(X val) throws IOException; - public X deserialize(byte[] bytes) throws IOException; -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java deleted file mode 100644 index 52df7a4..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/serialize/ProtoSerializer.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.sys.serialize; - -import java.io.IOException; - -import org.apache.commons.io.output.ByteArrayOutputStream; - -import com.dyuproject.protostuff.JsonIOUtil; -import com.dyuproject.protostuff.Schema; -import com.google.protobuf.Message; - -public class ProtoSerializer<X, B extends Message.Builder> implements PClassSerializer<X> { - - private final Schema<X> writeSchema; - private final Schema<B> readSchema; - - public ProtoSerializer(Schema<X> writeSchema, Schema<B> readSchema) { - super(); - this.writeSchema = writeSchema; - this.readSchema = readSchema; - } - - @Override - public byte[] serialize(X val) throws IOException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - JsonIOUtil.writeTo(baos, val, writeSchema, false); - return baos.toByteArray(); - } - - @SuppressWarnings("unchecked") - @Override - public X deserialize(byte[] bytes) throws IOException { - B b = readSchema.newMessage(); - JsonIOUtil.mergeFrom(bytes, b, readSchema, false); - return (X) b.build(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((readSchema == null) ? 0 : readSchema.hashCode()); - result = prime * result + ((writeSchema == null) ? 0 : writeSchema.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - ProtoSerializer other = (ProtoSerializer) obj; - if (readSchema == null) { - if (other.readSchema != null) { - return false; - } - } else if (!readSchema.equals(other.readSchema)) { - return false; - } - if (writeSchema == null) { - if (other.writeSchema != null) { - return false; - } - } else if (!writeSchema.equals(other.writeSchema)) { - return false; - } - return true; - } - - -} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java new file mode 100644 index 0000000..1ef8d12 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import static org.apache.drill.exec.ExecConstants.DRILL_SYS_FILE_SUFFIX; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.commons.io.IOUtils; +import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.sys.BasePersistentStore; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LocalPersistentStore<V> extends BasePersistentStore<V> { + private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + + private final Path basePath; + private final PersistentStoreConfig<V> config; + private final DrillFileSystem fs; + + public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) { + super(); + this.basePath = new Path(base, config.getName()); + this.config = config; + this.fs = fs; + + try { + mkdirs(basePath); + } catch (IOException e) { + throw new RuntimeException("Failure setting pstore configuration path."); + } + } + + @Override + public PersistentStoreMode getMode() { + return PersistentStoreMode.PERSISTENT; + } + + private void mkdirs(Path path) throws IOException{ + fs.mkdirs(path); + } + + public static Path getLogDir(){ + String drillLogDir = System.getenv("DRILL_LOG_DIR"); + if (drillLogDir == null) { + drillLogDir = "/var/log/drill"; + } + return new Path(new File(drillLogDir).getAbsoluteFile().toURI()); + } + + public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{ + Path blobRoot = root == null ? getLogDir() : root; + Configuration fsConf = new Configuration(); + if(blobRoot.toUri().getScheme() != null){ + fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); + } + + + DrillFileSystem fs = new DrillFileSystem(fsConf); + fs.mkdirs(blobRoot); + return fs; + } + + @Override + public Iterator<Map.Entry<String, V>> getRange(int skip, int take) { + try{ + List<FileStatus> f = fs.list(false, basePath); + if (f == null || f.isEmpty()) { + return Collections.emptyIterator(); + } + List<String> files = Lists.newArrayList(); + + for (FileStatus stat : f) { + String s = stat.getPath().getName(); + if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { + files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + } + } + + Collections.sort(files); + return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { + @Nullable + @Override + public Entry<String, V> apply(String key) { + return new ImmutableEntry<>(key, get(key)); + } + }).iterator(); + }catch(IOException e){ + throw new RuntimeException(e); + } + } + + private Path makePath(String name) { + Preconditions.checkArgument( + !name.contains("/") && + !name.contains(":") && + !name.contains("..")); + + final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + // do this to check file name. + return path; + } + + public V get(String key) { + try{ + Path path = makePath(key); + if(!fs.exists(path)){ + return null; + } + }catch(IOException e){ + throw new RuntimeException(e); + } + + final Path path = makePath(key); + try (InputStream is = fs.open(path)) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + } + } + + public void put(String key, V value) { + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public boolean putIfAbsent(String key, V value) { + try { + Path p = makePath(key); + if (fs.exists(p)) { + return false; + } else { + put(key, value); + return true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public void delete(String key) { + try { + fs.delete(makePath(key), false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java new file mode 100644 index 0000000..3dde4b8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import javax.annotation.Nullable; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterators; +import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.coord.zk.PathUtils; +import org.apache.drill.exec.coord.zk.ZookeeperClient; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.serialization.InstanceSerializer; +import org.apache.drill.exec.store.sys.BasePersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreMode; +import org.apache.zookeeper.CreateMode; + +/** + * Zookeeper based implementation of {@link org.apache.drill.exec.store.sys.PersistentStore}. + */ +public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperPersistentStore.class); + + private final PersistentStoreConfig<V> config; + private final ZookeeperClient client; + + public ZookeeperPersistentStore(final CuratorFramework framework, final PersistentStoreConfig<V> config) throws StoreException { + this.config = Preconditions.checkNotNull(config); + this.client = new ZookeeperClient(framework, PathUtils.join("/", config.getName()), CreateMode.PERSISTENT); + } + + public void start() throws Exception { + client.start(); + } + + @Override + public PersistentStoreMode getMode() { + return config.getMode(); + } + + @Override + public V get(final String key) { + final byte[] bytes = client.get(key); + if (bytes == null) { + return null; + } + try { + return config.getSerializer().deserialize(bytes); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to deserialize value at %s", key), e); + } + } + + @Override + public void put(final String key, final V value) { + final InstanceSerializer<V> serializer = config.getSerializer(); + try { + final byte[] bytes = serializer.serialize(value); + client.put(key, bytes); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e); + } + } + + @Override + public boolean putIfAbsent(final String key, final V value) { + final V old = get(key); + if (old == null) { + try { + final byte[] bytes = config.getSerializer().serialize(value); + client.put(key, bytes); + return true; + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); + } + } + return false; + } + + @Override + public void delete(final String key) { + client.delete(key); + } + + @Override + public Iterator<Map.Entry<String, V>> getRange(final int skip, final int take) { + final Iterator<Map.Entry<String, byte[]>> entries = client.entries(); + Iterators.advance(entries, skip); + return Iterators.transform(Iterators.limit(entries, take), new Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() { + @Nullable + @Override + public Map.Entry<String, V> apply(@Nullable Map.Entry<String, byte[]> input) { + try { + final V value = config.getSerializer().deserialize(input.getValue()); + return new ImmutableEntry<>(input.getKey(), value); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to deserialize value at key %s", input.getKey()), e); + } + } + }); + } + + @Override + public void close() { + try{ + client.close(); + } catch(final Exception e) { + logger.warn("Failure while closing out %s.", getClass().getSimpleName(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java new file mode 100644 index 0000000..e497a4c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/BasePersistentStoreProvider.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store.provider; + +import org.apache.drill.exec.store.sys.PersistentStoreProvider; + +public abstract class BasePersistentStoreProvider implements PersistentStoreProvider { + @Override + public void start() throws Exception { } + + @Override + public void close() throws Exception { } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java new file mode 100644 index 0000000..99ccc8e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store.provider; + +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.PersistentStoreProvider; + +public class CachingPersistentStoreProvider extends BasePersistentStoreProvider { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingPersistentStoreProvider.class); + + private final ConcurrentMap<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = Maps.newConcurrentMap(); + private final PersistentStoreProvider provider; + + public CachingPersistentStoreProvider(PersistentStoreProvider provider) { + this.provider = provider; + } + + @SuppressWarnings("unchecked") + public <V> PersistentStore<V> getOrCreateStore(final PersistentStoreConfig<V> config) throws StoreException { + final PersistentStore<?> store = storeCache.get(config); + if (store == null) { + final PersistentStore<?> newStore = provider.getOrCreateStore(config); + final PersistentStore<?> finalStore = storeCache.putIfAbsent(config, newStore); + if (finalStore == null) { + return (PersistentStore<V>)newStore; + } + try { + newStore.close(); + } catch (Exception ex) { + throw new StoreException(ex); + } + } + + return (PersistentStore<V>) store; + } + + @Override + public void start() throws Exception { + provider.start(); + } + + @Override + public void close() throws Exception { + final List<AutoCloseable> closeables = Lists.newArrayList(); + for (final AutoCloseable store : storeCache.values()) { + closeables.add(store); + } + closeables.add(provider); + storeCache.clear(); + AutoCloseables.close(closeables); + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java new file mode 100644 index 0000000..9bf18ab --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store.provider; + +import java.io.IOException; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.exec.testing.store.NoWriteLocalStore; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A really simple provider that stores data in the local file system, one value per file. + */ +public class LocalPersistentStoreProvider extends BasePersistentStoreProvider { + private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStoreProvider.class); + + private final Path path; + private final DrillFileSystem fs; + // This flag is used in testing. Ideally, tests should use a specific PersistentStoreProvider that knows + // how to handle this flag. + private final boolean enableWrite; + + public LocalPersistentStoreProvider(final PersistentStoreRegistry registry) throws StoreException { + this(registry.getConfig()); + } + + public LocalPersistentStoreProvider(final DrillConfig config) throws StoreException { + this.path = new Path(config.getString(ExecConstants.SYS_STORE_PROVIDER_LOCAL_PATH)); + this.enableWrite = config.getBoolean(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE); + try { + this.fs = LocalPersistentStore.getFileSystem(config, path); + } catch (IOException e) { + throw new StoreException("unable to get filesystem", e); + } + } + + @Override + public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> storeConfig) { + switch(storeConfig.getMode()){ + case BLOB_PERSISTENT: + case PERSISTENT: + if (enableWrite) { + return new LocalPersistentStore<>(fs, path, storeConfig); + } + return new NoWriteLocalStore<>(); + default: + throw new IllegalStateException(); + } + } + + + @Override + public void close() throws Exception { + fs.close(); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/8126927f/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java new file mode 100644 index 0000000..58c46a7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/ZookeeperPersistentStoreProvider.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.sys.store.provider; + +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.CuratorFramework; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; +import org.apache.drill.exec.exception.StoreException; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.PersistentStoreConfig; +import org.apache.drill.exec.store.sys.store.LocalPersistentStore; +import org.apache.drill.exec.store.sys.store.ZookeeperPersistentStore; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ZookeeperPersistentStoreProvider extends BasePersistentStoreProvider { + private static final Logger logger = LoggerFactory.getLogger(ZookeeperPersistentStoreProvider.class); + + private static final String DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT = "drill.exec.sys.store.provider.zk.blobroot"; + + private final CuratorFramework curator; + private final DrillFileSystem fs; + private final Path blobRoot; + + public ZookeeperPersistentStoreProvider(final PersistentStoreRegistry<ZKClusterCoordinator> registry) throws StoreException { + this(registry.getConfig(), registry.getCoordinator().getCurator()); + } + + @VisibleForTesting + public ZookeeperPersistentStoreProvider(final DrillConfig config, final CuratorFramework curator) throws StoreException { + this.curator = curator; + + if (config.hasPath(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)) { + blobRoot = new Path(config.getString(DRILL_EXEC_SYS_STORE_PROVIDER_ZK_BLOBROOT)); + }else{ + blobRoot = LocalPersistentStore.getLogDir(); + } + + try { + this.fs = LocalPersistentStore.getFileSystem(config, blobRoot); + } catch (IOException ex) { + throw new StoreException("unable to get filesystem", ex); + } + } + + @Override + public <V> PersistentStore<V> getOrCreateStore(final PersistentStoreConfig<V> config) throws StoreException { + switch(config.getMode()){ + case BLOB_PERSISTENT: + return new LocalPersistentStore<>(fs, blobRoot, config); + case PERSISTENT: + final ZookeeperPersistentStore<V> store = new ZookeeperPersistentStore<>(curator, config); + try { + store.start(); + } catch (Exception e) { + throw new StoreException("unable to start zookeeper store", e); + } + return store; + default: + throw new IllegalStateException(); + } + } + + @Override + public void close() throws Exception { + fs.close(); + } +}
