Github user mjsax commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1617#discussion_r52466186
  
    --- Diff: 
flink-contrib/flink-statebackend-redis/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRedisState.java
 ---
    @@ -0,0 +1,374 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * <p/>
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * <p/>
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.contrib.streaming.state;
    +
    +import org.apache.commons.io.FileUtils;
    +import org.apache.commons.io.IOUtils;
    +import org.apache.flink.api.common.state.State;
    +import org.apache.flink.api.common.state.StateDescriptor;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +import org.apache.flink.runtime.state.AbstractStateBackend;
    +import org.apache.flink.runtime.state.KvState;
    +import org.apache.flink.runtime.state.KvStateSnapshot;
    +import org.apache.flink.util.HDFSCopyFromLocal;
    +import org.apache.flink.util.HDFSCopyToLocal;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import redis.clients.jedis.BinaryJedis;
    +import redis.clients.jedis.exceptions.JedisConnectionException;
    +
    +import java.io.BufferedReader;
    +import java.io.ByteArrayOutputStream;
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.net.URI;
    +import java.util.Arrays;
    +import java.util.List;
    +
    +/**
    + * Base class for {@link State} implementations that store state in a 
Redis database.
    + *
    + * <p>This base class is responsible for setting up the Redis database, for
    + * checkpointing/restoring the database and for disposal in the {@link 
#dispose()} method. The
    + * concrete subclasses just use the Redis handle to store/retrieve state.
    + *
    + * @param <K> The type of the key.
    + * @param <N> The type of the namespace.
    + * @param <S> The type of {@link State}.
    + * @param <SD> The type of {@link StateDescriptor}.
    + * @param <Backend> The type of the backend that snapshots this key/value 
state.
    + */
    +public abstract class AbstractRedisState<K, N, S extends State, SD extends 
StateDescriptor<S, ?>, Backend extends AbstractStateBackend>
    +   implements KvState<K, N, S, SD, Backend>, State {
    +
    +   /** Serializer for the keys */
    +   protected final TypeSerializer<K> keySerializer;
    +
    +   /** Serializer for the namespace */
    +   protected final TypeSerializer<N> namespaceSerializer;
    +
    +   /** The current key, which the next value methods will refer to */
    +   protected K currentKey;
    +
    +   /** The current namespace, which the next value methods will refer to */
    +   protected N currentNamespace;
    +
    +   /** Store it so that we can clean up in dispose() */
    +   protected final File dbPath;
    +
    +   protected final String checkpointPath;
    +
    +   /** Our Redis instance */
    +   protected Process redisServerProcess;
    +   protected final BinaryJedis db;
    +
    +   /**
    +    * Creates a new Redis backed state.
    +    *
    +    * @param redisExecPath The path on the local system where Redis 
executable file can be found.
    +    * @param port The port to start Redis server
    +    * @param keySerializer The serializer for the keys.
    +    * @param namespaceSerializer The serializer for the namespace.
    +    * @param dbPath The path on the local system where Redis data should 
be stored.
    +    */
    +   protected AbstractRedisState(String redisExecPath, int port, 
TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File 
dbPath, String checkpointPath) {
    +           this.keySerializer = keySerializer;
    +           this.namespaceSerializer = namespaceSerializer;
    +           this.dbPath = dbPath;
    +           this.checkpointPath = checkpointPath;
    +
    +           if (!dbPath.exists()) {
    +                   if (!dbPath.mkdirs()) {
    +                           throw new RuntimeException("Could not create 
Redis data directory.");
    +                   }
    +           }
    +
    +           try {
    +                   startRedisServer(redisExecPath, "--port", 
String.valueOf(port), "--dir", dbPath.getAbsolutePath());
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Could not start Redis 
server", e);
    +           }
    +
    +           try {
    +                   db = new BinaryJedis("localhost", port);
    +                   db.ping();
    +           } catch (JedisConnectionException e) {
    +                   throw new RuntimeException("Error while opening 
connection to Redis server", e);
    +           }
    +   }
    +
    +   /**
    +    * Creates a new Redis backed state and restores from the given backup 
directory. After
    +    * restoring the backup directory is deleted.
    +    *
    +    * @param redisExecPath The path on the local system where Redis 
executable file can be found.
    +    * @param port The port to start Redis server
    +    * @param keySerializer The serializer for the keys.
    +    * @param namespaceSerializer The serializer for the namespace.
    +    * @param dbPath The path on the local system where RocksDB data should 
be stored.
    +    * @param restorePath The path to a backup directory from which to 
restore Redis database.
    +    */
    +   protected AbstractRedisState(String redisExecPath, int port, 
TypeSerializer<K> keySerializer, TypeSerializer<N> namespaceSerializer, File 
dbPath, String checkpointPath, String restorePath) {
    +           try {
    +                   HDFSCopyToLocal.copyToLocal(new URI(restorePath + 
"/dump.rdb"), dbPath);
    +           } catch (Exception e) {
    +                   throw new RuntimeException("Error while restoring Redis 
state from " + restorePath, e);
    +           }
    +
    +           try {
    +                   startRedisServer(redisExecPath, "--port", 
String.valueOf(port), "--dir", dbPath.getAbsolutePath());
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Could not start Redis 
server");
    +           }
    +
    +           this.keySerializer = keySerializer;
    +           this.namespaceSerializer = namespaceSerializer;
    +           this.dbPath = dbPath;
    +           this.checkpointPath = checkpointPath;
    +
    +           if (!dbPath.exists()) {
    +                   if (!dbPath.mkdirs()) {
    +                           throw new RuntimeException("Could not create 
Redis data directory.");
    +                   }
    +           }
    +
    +           try {
    +                   db = new BinaryJedis("localhost", port);
    +                   db.ping();
    +           } catch (JedisConnectionException e) {
    +                   throw new RuntimeException("Error while opening 
connection to Redis server", e);
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   final public void clear() {
    +           ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(baos);
    +           try {
    +                   writeKeyAndNamespace(out);
    +                   byte[] key = baos.toByteArray();
    +                   db.del(key);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error while removing entry 
from Redis", e);
    +           }
    +   }
    +
    +   protected void writeKeyAndNamespace(DataOutputView out) throws 
IOException {
    +           keySerializer.serialize(currentKey, out);
    +           out.writeByte(42);
    +           namespaceSerializer.serialize(currentNamespace, out);
    +   }
    +
    +   @Override
    +   public void setCurrentKey(K currentKey) {
    +           this.currentKey = currentKey;
    +   }
    +
    +   @Override
    +   public void setCurrentNamespace(N currentNamespace) {
    +           this.currentNamespace = currentNamespace;
    +   }
    +
    +   protected abstract KvStateSnapshot<K, N, S, SD, Backend> 
createRedisSnapshot(URI backupUri, long checkpointId);
    +
    +   @Override
    +   public KvStateSnapshot<K, N, S, SD, Backend> snapshot(long 
checkpointId, long timestamp) throws Exception {
    +           boolean success = false;
    +
    +           final File localBackupPath = new File(dbPath, "backup-" + 
checkpointId);
    +           final URI backupUri = new URI(checkpointPath + "/chk-" + 
checkpointId);
    +
    +           try {
    +                   if (!localBackupPath.exists()) {
    +                           if (!localBackupPath.mkdirs()) {
    +                                   throw new RuntimeException("Could not 
create local backup path " + localBackupPath);
    +                           }
    +                   }
    +
    +                   db.configSet("dir".getBytes(), 
localBackupPath.getAbsolutePath().getBytes());
    +                   db.save();
    +
    +                   HDFSCopyFromLocal.copyFromLocal(localBackupPath, 
backupUri);
    +                   KvStateSnapshot<K, N, S, SD, Backend> result = 
createRedisSnapshot(backupUri, checkpointId);
    +                   success = true;
    +                   return result;
    +           }finally {
    +                   FileUtils.deleteDirectory(localBackupPath);
    +                   if (!success) {
    +                           FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
    +                           fs.delete(new Path(backupUri), true);
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void dispose() {
    +           db.disconnect();
    +           stopRedisServer();
    +           try {
    +                   FileUtils.deleteDirectory(dbPath);
    +           } catch (IOException e) {
    +                   throw new RuntimeException("Error disposing Redis data 
directory.", e);
    +           }
    +   }
    +
    +   /**
    +    * Starts the Redis server
    +    *
    +    * @param args
    +    * @throws IOException
    +     */
    +   private synchronized void startRedisServer(String... args) throws 
IOException {
    +           List<String> argsList = Arrays.asList(args);
    +           File executable = new File(argsList.get(0));
    +           ProcessBuilder pb = new ProcessBuilder(argsList);
    +           pb.directory(executable.getParentFile());
    +
    +           redisServerProcess = pb.start();
    +
    +           BufferedReader reader = new BufferedReader(new 
InputStreamReader(redisServerProcess.getInputStream()));
    +           try {
    +                   String outputLine;
    +                   do {
    +                           outputLine = reader.readLine();
    +                           if (outputLine == null) {
    +                                   //Something goes wrong. Stream is ended 
before server was activated.
    +                                   throw new RuntimeException("Can't start 
redis server. Check logs for details.");
    +                           }
    +                   } while (!outputLine.matches(".*The server is now ready 
to accept connections on port.*"));
    +           } finally {
    +                   IOUtils.closeQuietly(reader);
    +           }
    +   }
    +
    +   /**
    +    * Stops the Redis server
    +    */
    +   private synchronized void stopRedisServer() {
    +           try {
    +                   redisServerProcess.destroy();
    +                   redisServerProcess.waitFor();
    +           } catch (InterruptedException e) {
    +                   throw new RuntimeException("Failed to stop redis 
instance", e);
    +           }
    +   }
    +
    +   public static abstract class AbstractRedisSnapshot<K, N, S extends 
State, SD extends StateDescriptor<S, ?>, Backend extends AbstractStateBackend> 
implements KvStateSnapshot<K, N, S, SD, Backend> {
    +           private static final long serialVersionUID = 1L;
    +
    +           private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRedisSnapshot.class);
    +
    +           // 
------------------------------------------------------------------------
    +           //  Ctor parameters for Redis state
    +           // 
------------------------------------------------------------------------
    +
    +           /** Store it so that we can clean up in dispose() */
    +           protected final File dbPath;
    +
    +           /** Where we should put Redis backups */
    +           protected final String checkpointPath;
    +
    +           // 
------------------------------------------------------------------------
    +           //  Info about this checkpoint
    +           // 
------------------------------------------------------------------------
    +
    +           protected final URI backupUri;
    +
    +           protected long checkpointId;
    +
    +           // 
------------------------------------------------------------------------
    +           //  For sanity checks
    +           // 
------------------------------------------------------------------------
    +
    +           /** Key serializer */
    +           protected final TypeSerializer<K> keySerializer;
    +
    +           /** Namespace serializer */
    +           protected final TypeSerializer<N> namespaceSerializer;
    +
    +           /** Hash of the StateDescriptor, for sanity checks */
    +           protected final SD stateDesc;
    +
    +           public AbstractRedisSnapshot(File dbPath, String 
checkpointPath, URI backupUri, long checkpointId, TypeSerializer<K> 
keySerializer, TypeSerializer<N> namespaceSerializer, SD stateDesc) {
    +                   this.dbPath = dbPath;
    +                   this.checkpointPath = checkpointPath;
    +                   this.backupUri = backupUri;
    +                   this.checkpointId = checkpointId;
    +
    +                   this.stateDesc = stateDesc;
    +                   this.keySerializer = keySerializer;
    +                   this.namespaceSerializer = namespaceSerializer;
    +           }
    +
    +           protected abstract KvState<K, N, S, SD, Backend> 
createRedisState(TypeSerializer<K> keySerializer, TypeSerializer<N> 
namespaceSerializer, SD stateDesc, File dbPath, String backupPath, String 
restorePath) throws Exception;
    +
    +           @Override
    +           public KvState<K, N, S, SD, Backend> restoreState(Backend 
stateBackend, TypeSerializer<K> keySerializer, ClassLoader classLoader, long 
recoveryTimestamp) throws Exception {
    +
    +                   // validity checks
    +                   if (!this.keySerializer.equals(keySerializer)) {
    +                           throw new IllegalArgumentException(
    +                                   "Cannot restore the state from the 
snapshot with the given serializers. " +
    +                                           "State (K/V) was serialized 
with " +
    +                                           "(" + keySerializer + ") " +
    +                                           "now is (" + keySerializer + 
")");
    +                   }
    +
    +                   if (!dbPath.exists()) {
    +                           if (!dbPath.mkdirs()) {
    +                                   throw new RuntimeException("Could not 
create Redis base path " + dbPath);
    +                           }
    +                   }
    +
    +                   FileSystem fs = FileSystem.get(backupUri, new 
Configuration());
    --- End diff --
    
    Variable `fs` is never used...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to