[ 
https://issues.apache.org/jira/browse/FLINK-3035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15140916#comment-15140916
 ] 

ASF GitHub Bot commented on FLINK-3035:
---------------------------------------

Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1617#discussion_r52467132
  
    --- Diff: 
flink-contrib/flink-statebackend-redis/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRedisState.java
 ---
    @@ -0,0 +1,374 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.KvState;
    +import org.apache.flink.runtime.state.KvStateSnapshot;
    +import org.apache.flink.util.HDFSCopyFromLocal;
    +import org.apache.flink.util.HDFSCopyToLocal;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.BinaryJedis;
    +import redis.clients.jedis.exceptions.JedisConnectionException;
    +
    +import java.io.BufferedReader;
    +import java.io.ByteArrayOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.net.URI;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +/**
    + * Base class for {@link State} implementations that store state in a 
Redis database.
    + *
    + * <p>This base class is responsible for setting up the Redis database, for
    + * checkpointing/restoring the database and for disposal in the {@link 
#dispose()} method. The
    + * concrete subclasses just use the Redis handle to store/retrieve state.
    + *
    + * @param <K> The type of the key.
    + * @param <N> The type of the namespace.
    + * @param <S> The type of {@link State}.
    + * @param <SD> The type of {@link StateDescriptor}.
    + * @param <Backend> The type of the backend that snapshots this key/value 
state.
    + */
    +public abstract class AbstractRedisState<K, N, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
    +   implements KvState<K, N, S, SD, Backend>, State {
    +
    +   /** Serializer for the keys */
    +   protected final TypeSerializer<K> keySerializer;
    +
    +   /** Serializer for the namespace */
    +   protected final TypeSerializer<N> namespaceSerializer;
    +
    +   /** The current key, which the next value methods will refer to */
    +   protected K currentKey;
    +
    +   /** The current namespace, which the next value methods will refer to */
    +   protected N currentNamespace;
    +
    +   /** Store it so that we can clean up in dispose() */
    +   protected final File dbPath;
    +
    +   protected final String checkpointPath;
    +
    +   /** Our Redis instance */
    +   protected Process redisServerProcess;
    +   protected final BinaryJedis db;
    +
    +   /**
    +    * Creates a new Redis backed state.
    +    *
    +    * @param redisExecPath The path on the local system where Redis 
executable file can be found.
    +    * @param port The port to start Redis server
    +    * @param keySerializer The serializer for the keys.
    +    * @param namespaceSerializer The serializer for the namespace.
    +    * @param dbPath The path on the local system where Redis data should 
be stored.
    +    */
    +   protected AbstractRedisState(String redisExecPath, int port, 
TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File 
dbPath, String checkpointPath) {
    +           this.keySerializer = keySerializer;
    +           this.namespaceSerializer = namespaceSerializer;
    +           this.dbPath = dbPath;
    +           this.checkpointPath = checkpointPath;
    +
    +           if (!dbPath.exists()) {
    +                   if (!dbPath.mkdirs()) {
    +                           throw new RuntimeException("Could not create 
Redis data directory.");
    +                   }
    +           }
    +
    +           try {
    +                   startRedisServer(redisExecPath, "--port", 
String.valueOf(port), "--dir", dbPath.getAbsolutePath());
    --- End diff --
    
    Not sure, if the State should start the Redis server by itself... If Redis 
server runs externally and is not started up before a job is deployed, we might 
want to fail the job. (Not sure though; @aljoscha what do you think about this?)


> Redis as State Backend
> ----------------------
>
>                 Key: FLINK-3035
>                 URL: https://issues.apache.org/jira/browse/FLINK-3035
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Matthias J. Sax
>            Assignee: Subhobrata Dey
>            Priority: Minor
>
> Add Redis as a state backend for distributed snapshots.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to