[ https://issues.apache.org/jira/browse/DRILL-4275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15147906#comment-15147906 ]
ASF GitHub Bot commented on DRILL-4275: --------------------------------------- Github user hnfgns commented on a diff in the pull request: https://github.com/apache/drill/pull/374#discussion_r52954081 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java --- @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.coord.zk; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import javax.annotation.Nullable; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.coord.store.BaseTransientStore; +import org.apache.drill.exec.coord.store.TransientStoreConfig; +import org.apache.drill.exec.coord.store.TransientStoreEvent; +import org.apache.drill.exec.serialization.InstanceSerializer; +import org.apache.zookeeper.CreateMode; + +public class ZkEphemeralStore<V> extends BaseTransientStore<V> { + + @VisibleForTesting + protected final PathChildrenCacheListener dispatcher = new EventDispatcher<>(this); + private final ZookeeperClient client; + + public ZkEphemeralStore(final TransientStoreConfig<V> config, final CuratorFramework curator) { + super(config); + this.client = new ZookeeperClient(curator, PathUtils.join("/", config.getName()), CreateMode.EPHEMERAL); + } + + public void start() throws Exception { + getClient().getCache().getListenable().addListener(dispatcher); + getClient().start(); + } + + protected ZookeeperClient getClient() { + return client; + } + + @Override + public V get(final String key) { + final byte[] bytes = getClient().get(key); + if (bytes == null) { + return null; + } + try { + return config.getSerializer().deserialize(bytes); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to deserialize value at %s", key), e); + } + } + + @Override + public V put(final String key, final V value) { + final InstanceSerializer<V> serializer = config.getSerializer(); + try { + final byte[] old = getClient().get(key); + final byte[] bytes = serializer.serialize(value); + getClient().put(key, bytes); + if (old == null) { + return null; + } + return serializer.deserialize(old); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to de/serialize value of type %s", value.getClass()), e); + } + } + + @Override + public V putIfAbsent(final String key, final V value) { + final V old = get(key); + if (old == null) { + try { + final byte[] bytes = config.getSerializer().serialize(value); + getClient().put(key, bytes); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to serialize value of type %s", value.getClass()), e); + } + } + return old; + } + + @Override + public V remove(final String key) { + final V existing = get(key); + if (existing != null) { + getClient().delete(key); + } + return existing; + } + + @Override + public Iterator<Map.Entry<String, V>> entries() { + return Iterators.transform(getClient().entries(), new Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() { + @Nullable + @Override + public Map.Entry<String, V> apply(@Nullable Map.Entry<String, byte[]> input) { + try { + final V value = config.getSerializer().deserialize(input.getValue()); + return new ImmutableEntry<>(input.getKey(), value); + } catch (final IOException e) { + throw new DrillRuntimeException(String.format("unable to deserialize value at key %s", input.getKey()), e); + } + } + }); + } + + @Override + public int size() { + return Lists.newArrayList(entries()).size(); --- End diff -- yep. made it so that it avoids extraneous deserialization step. > Refactor e/pstore interfaces and their factories to provide a unified > mechanism to access stores > ------------------------------------------------------------------------------------------------ > > Key: DRILL-4275 > URL: https://issues.apache.org/jira/browse/DRILL-4275 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Flow > Reporter: Hanifi Gunes > Assignee: Deneche A. Hakim > > We rely on E/PStore interfaces to persist data. Even though E/PStore stands > for Ephemeral and Persistent stores respectively, the current design for > EStore does not extend the interface/functionality of PStore at all, which > hints abstraction for EStore is redundant. This issue proposes a new unified > Store interface replacing the old E/PStore that exposes an additional method > that report persistence level as follows: > {code:title=Store interface} > interface Store<V> { > StoreMode getMode(); > V get(String key); > ... > } > enum StoreMode { > EPHEMERAL, > PERSISTENT, > ... > } > {code} > The new design brings in less redundancy, more centralized code, ease to > reason and maintain. -- This message was sent by Atlassian JIRA (v6.3.4#6332)