[ 
https://issues.apache.org/jira/browse/DRILL-4275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15147906#comment-15147906
 ] 

ASF GitHub Bot commented on DRILL-4275:
---------------------------------------

Github user hnfgns commented on a diff in the pull request:

    https://github.com/apache/drill/pull/374#discussion_r52954081
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZkEphemeralStore.java
 ---
    @@ -0,0 +1,145 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.coord.zk;
    +
    +import java.io.IOException;
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import javax.annotation.Nullable;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.base.Function;
    +import com.google.common.collect.Iterators;
    +import com.google.common.collect.Lists;
    +import org.apache.curator.framework.CuratorFramework;
    +import 
org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    +import org.apache.drill.common.collections.ImmutableEntry;
    +import org.apache.drill.common.exceptions.DrillRuntimeException;
    +import org.apache.drill.exec.coord.store.BaseTransientStore;
    +import org.apache.drill.exec.coord.store.TransientStoreConfig;
    +import org.apache.drill.exec.coord.store.TransientStoreEvent;
    +import org.apache.drill.exec.serialization.InstanceSerializer;
    +import org.apache.zookeeper.CreateMode;
    +
    +public class ZkEphemeralStore<V> extends BaseTransientStore<V> {
    +
    +  @VisibleForTesting
    +  protected final PathChildrenCacheListener dispatcher = new 
EventDispatcher<>(this);
    +  private final ZookeeperClient client;
    +
    +  public ZkEphemeralStore(final TransientStoreConfig<V> config, final 
CuratorFramework curator) {
    +    super(config);
    +    this.client = new ZookeeperClient(curator, PathUtils.join("/", 
config.getName()), CreateMode.EPHEMERAL);
    +  }
    +
    +  public void start() throws Exception {
    +    getClient().getCache().getListenable().addListener(dispatcher);
    +    getClient().start();
    +  }
    +
    +  protected ZookeeperClient getClient() {
    +    return client;
    +  }
    +
    +  @Override
    +  public V get(final String key) {
    +    final byte[] bytes = getClient().get(key);
    +    if (bytes == null) {
    +      return null;
    +    }
    +    try {
    +      return config.getSerializer().deserialize(bytes);
    +    } catch (final IOException e) {
    +      throw new DrillRuntimeException(String.format("unable to deserialize 
value at %s", key), e);
    +    }
    +  }
    +
    +  @Override
    +  public V put(final String key, final V value) {
    +    final InstanceSerializer<V> serializer = config.getSerializer();
    +    try {
    +      final byte[] old = getClient().get(key);
    +      final byte[] bytes = serializer.serialize(value);
    +      getClient().put(key, bytes);
    +      if (old == null) {
    +        return null;
    +      }
    +      return serializer.deserialize(old);
    +    } catch (final IOException e) {
    +      throw new DrillRuntimeException(String.format("unable to 
de/serialize value of type %s", value.getClass()), e);
    +    }
    +  }
    +
    +  @Override
    +  public V putIfAbsent(final String key, final V value) {
    +    final V old = get(key);
    +    if (old == null) {
    +      try {
    +        final byte[] bytes = config.getSerializer().serialize(value);
    +        getClient().put(key, bytes);
    +      } catch (final IOException e) {
    +        throw new DrillRuntimeException(String.format("unable to serialize 
value of type %s", value.getClass()), e);
    +      }
    +    }
    +    return old;
    +  }
    +
    +  @Override
    +  public V remove(final String key) {
    +    final V existing = get(key);
    +    if (existing != null) {
    +      getClient().delete(key);
    +    }
    +    return existing;
    +  }
    +
    +  @Override
    +  public Iterator<Map.Entry<String, V>> entries() {
    +    return Iterators.transform(getClient().entries(), new 
Function<Map.Entry<String, byte[]>, Map.Entry<String, V>>() {
    +      @Nullable
    +      @Override
    +      public Map.Entry<String, V> apply(@Nullable Map.Entry<String, 
byte[]> input) {
    +        try {
    +          final V value = 
config.getSerializer().deserialize(input.getValue());
    +          return new ImmutableEntry<>(input.getKey(), value);
    +        } catch (final IOException e) {
    +          throw new DrillRuntimeException(String.format("unable to 
deserialize value at key %s", input.getKey()), e);
    +        }
    +      }
    +    });
    +  }
    +
    +  @Override
    +  public int size() {
    +    return Lists.newArrayList(entries()).size();
    --- End diff --
    
    yep. made it so that it avoids extraneous deserialization step.


> Refactor e/pstore interfaces and their factories to provide a unified 
> mechanism to access stores
> ------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-4275
>                 URL: https://issues.apache.org/jira/browse/DRILL-4275
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>            Reporter: Hanifi Gunes
>            Assignee: Deneche A. Hakim
>
> We rely on E/PStore interfaces to persist data. Even though E/PStore stands 
> for Ephemeral and Persistent stores respectively, the current design for 
> EStore does not extend the interface/functionality of PStore at all, which 
> hints abstraction for EStore is redundant. This issue proposes a new unified 
> Store interface replacing the old E/PStore that exposes an additional method 
> that report persistence level as follows:
> {code:title=Store interface}
> interface Store<V> {
>   StoreMode getMode();
>   V get(String key);
>   ...
> }
> enum StoreMode {
>   EPHEMERAL,
>   PERSISTENT,
>   ...
> }
> {code}
> The new design brings in less redundancy, more centralized code, ease to 
> reason and maintain.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to