HDFS-13421. [PROVIDED Phase 2] Implement DNA_BACKUP command in Datanode. 
Contributed by Ewan Higgs.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/959f49b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/959f49b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/959f49b4

Branch: refs/heads/HDFS-12090
Commit: 959f49b4803bae374da086b1891655f90e5502e5
Parents: 2c52ff5
Author: Virajith Jalaparti <viraj...@apache.org>
Authored: Wed Aug 1 12:13:31 2018 -0700
Committer: Ewan Higgs <ewan.hi...@wdc.com>
Committed: Fri Aug 10 13:34:28 2018 +0200

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/BlockInputStream.java    |  52 ++++++++
 .../hdfs/server/datanode/BPOfferService.java    |   6 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |  20 ++-
 .../SyncServiceSatisfierDatanodeWorker.java     |  97 +++++++++++++++
 .../SyncTaskExecutionFeedbackCollector.java     |  54 ++++++++
 .../executor/BlockSyncOperationExecutor.java    | 122 +++++++++++++++++++
 .../executor/BlockSyncReaderFactory.java        |  92 ++++++++++++++
 .../executor/BlockSyncTaskRunner.java           |  69 +++++++++++
 .../hadoop/hdfs/TestBlockInputStream.java       |  84 +++++++++++++
 .../TestBlockSyncOperationExecutor.java         |  94 ++++++++++++++
 10 files changed, 689 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
new file mode 100644
index 0000000..152f83e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/BlockInputStream.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Facade around BlockReader that indeed implements the InputStream interface.
+ */
+public class BlockInputStream extends InputStream {
+  private final BlockReader blockReader;
+
+  public BlockInputStream(BlockReader blockReader) {
+    this.blockReader = blockReader;
+  }
+
+  @Override
+  public int read() throws IOException {
+    byte[] b = new byte[1];
+    int c = blockReader.read(b, 0, b.length);
+    if (c > 0) {
+      return b[0];
+    } else {
+      return -1;
+    }
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    return blockReader.read(b, off, len);
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    return blockReader.skip(n);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index a25f6a9..b8eef5e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -795,6 +795,12 @@ class BPOfferService {
           ((BlockECReconstructionCommand) cmd).getECTasks();
       dn.getErasureCodingWorker().processErasureCodingTasks(ecTasks);
       break;
+    case DatanodeProtocol.DNA_BACKUP:
+      LOG.info("DatanodeCommand action: DNA_BACKUP");
+      Collection<BlockSyncTask> backupTasks =
+          ((SyncCommand) cmd).getSyncTasks();
+      dn.getSyncServiceSatisfierDatanodeWorker().processSyncTasks(backupTasks);
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1e9c57a..8c675d9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -386,6 +386,7 @@ public class DataNode extends ReconfigurableBase
   private String dnUserName = null;
   private BlockRecoveryWorker blockRecoveryWorker;
   private ErasureCodingWorker ecWorker;
+  private SyncServiceSatisfierDatanodeWorker 
syncServiceSatisfierDatanodeWorker;
   private final Tracer tracer;
   private final TracerConfigurationManager tracerConfigurationManager;
   private static final int NUM_CORES = Runtime.getRuntime()
@@ -1425,6 +1426,9 @@ public class DataNode extends ReconfigurableBase
 
     ecWorker = new ErasureCodingWorker(getConf(), this);
     blockRecoveryWorker = new BlockRecoveryWorker(this);
+    syncServiceSatisfierDatanodeWorker =
+        new SyncServiceSatisfierDatanodeWorker(getConf(), this);
+    syncServiceSatisfierDatanodeWorker.start();
 
     blockPoolManager = new BlockPoolManager(this);
     blockPoolManager.refreshNamenodes(getConf());
@@ -1976,7 +1980,12 @@ public class DataNode extends ReconfigurableBase
         }
       }
     }
-    
+
+    // stop syncServiceSatisfierDatanodeWorker
+    if (syncServiceSatisfierDatanodeWorker != null) {
+      syncServiceSatisfierDatanodeWorker.stop();
+    }
+
     List<BPOfferService> bposArray = (this.blockPoolManager == null)
         ? new ArrayList<BPOfferService>()
         : this.blockPoolManager.getAllNamenodeThreads();
@@ -2129,6 +2138,11 @@ public class DataNode extends ReconfigurableBase
       notifyAll();
     }
     tracer.close();
+
+    // Waiting to finish backup SPS worker thread.
+    if (syncServiceSatisfierDatanodeWorker != null) {
+      syncServiceSatisfierDatanodeWorker.waitToFinishWorkerThread();
+    }
   }
 
   /**
@@ -3616,4 +3630,8 @@ public class DataNode extends ReconfigurableBase
     }
     return this.diskBalancer;
   }
+
+  public SyncServiceSatisfierDatanodeWorker 
getSyncServiceSatisfierDatanodeWorker() {
+    return syncServiceSatisfierDatanodeWorker;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
new file mode 100644
index 0000000..7216e8f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SyncServiceSatisfierDatanodeWorker.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hadoop.hdfs.server.datanode.syncservice.SyncTaskExecutionFeedbackCollector;
+import 
org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncOperationExecutor;
+import 
org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncReaderFactory;
+import 
org.apache.hadoop.hdfs.server.datanode.syncservice.executor.BlockSyncTaskRunner;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+public class SyncServiceSatisfierDatanodeWorker {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(SyncServiceSatisfierDatanodeWorker.class);
+
+  private ExecutorService executorService;
+  private BlockSyncOperationExecutor syncOperationExecutor;
+  private SyncTaskExecutionFeedbackCollector 
syncTaskExecutionFeedbackCollector;
+
+  public SyncServiceSatisfierDatanodeWorker(Configuration conf, DataNode 
dataNode) throws IOException {
+    this.executorService = HadoopExecutors.newFixedThreadPool(4);
+    this.syncOperationExecutor =
+        BlockSyncOperationExecutor.createOnDataNode(conf,
+          (locatedBlock, config) -> {
+              try {
+                return BlockSyncReaderFactory.createBlockReader(dataNode, 
locatedBlock, config);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          );
+    this.syncTaskExecutionFeedbackCollector = new 
SyncTaskExecutionFeedbackCollector();
+  }
+
+
+  public void start() {
+    this.executorService = HadoopExecutors.newFixedThreadPool(4);
+  }
+
+  public void stop() {
+    this.executorService.shutdown();
+  }
+
+  public void waitToFinishWorkerThread() {
+    try {
+      this.executorService.awaitTermination(3, TimeUnit.MINUTES);
+    } catch (InterruptedException e) {
+      LOG.warn("SyncServiceSatisfierDatanodeWorker interrupted during waiting 
for finalization.");
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void processSyncTasks(Collection<BlockSyncTask> blockSyncTasks) {
+
+    LOG.debug("Received SyncTasks: {}", blockSyncTasks);
+    for (BlockSyncTask blockSyncTask : blockSyncTasks) {
+      try {
+        executorService.submit(new BlockSyncTaskRunner(blockSyncTask,
+            syncOperationExecutor,
+            syncTaskExecutionFeedback -> syncTaskExecutionFeedbackCollector
+                .addFeedback(syncTaskExecutionFeedback)));
+      } catch (RejectedExecutionException e) {
+        LOG.warn("BlockSyncTask {} for {} was rejected: {}",
+            blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI(),
+            e.getCause());
+      }
+    }
+  }
+
+  public SyncTaskExecutionFeedbackCollector 
getSyncTaskExecutionFeedbackCollector() {
+    return syncTaskExecutionFeedbackCollector;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
new file mode 100644
index 0000000..41cd441
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/SyncTaskExecutionFeedbackCollector.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback;
+import org.apache.hadoop.hdfs.server.protocol.BulkSyncTaskExecutionFeedback;
+
+import java.util.List;
+
+/**
+ * DatanodeSyncTaskExecutionFeedbackCollector collects feedback for the
+ * sync service tracker to determine what has happened and report statistics.
+ */
+public class SyncTaskExecutionFeedbackCollector {
+
+  private List<BlockSyncTaskExecutionFeedback> collectedFeedback;
+
+  public SyncTaskExecutionFeedbackCollector() {
+    this.collectedFeedback = Lists.newArrayList();
+  }
+
+  public void addFeedback(BlockSyncTaskExecutionFeedback feedback) {
+    synchronized (this) {
+      collectedFeedback.add(feedback);
+    }
+  }
+
+  public BulkSyncTaskExecutionFeedback packageFeedbackForHeartbeat() {
+
+    List<BlockSyncTaskExecutionFeedback> feedbackForHeartbeat;
+
+    synchronized (this) {
+      feedbackForHeartbeat = collectedFeedback;
+      collectedFeedback = Lists.newArrayList();
+    }
+    return new BulkSyncTaskExecutionFeedback(feedbackForHeartbeat);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
new file mode 100644
index 0000000..7fde230
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncOperationExecutor.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockInputStream;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.DFSInputStream;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.SequenceInputStream;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * BlockSyncOperationExecutor writes the blocks to the sync service remote
+ * endpoint.
+ */
+public class BlockSyncOperationExecutor  {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockSyncOperationExecutor.class);
+
+  private Configuration conf;
+  private BiFunction<LocatedBlock, Configuration, BlockReader>
+      createBlockReader;
+  private Function<FileSystem, MultipartUploader> multipartUploaderSupplier;
+
+  @VisibleForTesting
+  BlockSyncOperationExecutor(Configuration conf,
+      BiFunction<LocatedBlock, Configuration, BlockReader> createBlockReader,
+      Function<FileSystem, MultipartUploader> multipartUploaderSupplier) {
+    this.conf = conf;
+    this.createBlockReader = createBlockReader;
+    this.multipartUploaderSupplier = multipartUploaderSupplier;
+  }
+
+  public static BlockSyncOperationExecutor createOnDataNode(Configuration conf,
+      BiFunction<LocatedBlock, Configuration, BlockReader> createBlockReader) {
+    return new BlockSyncOperationExecutor(conf,
+        createBlockReader,
+        fs -> {
+          try {
+            return MultipartUploaderFactory.get(fs, conf);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        });
+  }
+
+  public SyncTaskExecutionResult execute(BlockSyncTask blockSyncTask)
+      throws Exception {
+    LOG.info("Executing MetadataSyncTask {} (on {})",
+        blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI());
+
+      return doMultiPartPart(
+          blockSyncTask.getRemoteURI(),
+          blockSyncTask.getLocatedBlocks(),
+          blockSyncTask.getPartNumber(),
+          blockSyncTask.getUploadHandle(),
+          blockSyncTask.getOffset(),
+          blockSyncTask.getLength());
+  }
+
+  private SyncTaskExecutionResult doMultiPartPart(URI uri,
+      List<LocatedBlock> locatedBlocks, int partNumber, byte[] uploadHandle,
+      int offset, long length) throws IOException {
+    FileSystem fs = FileSystem.get(uri, conf);
+    Path filePath = new Path(uri);
+    Vector<InputStream> inputStreams = new Vector<>(locatedBlocks.size());
+    for (int i = 0; i < locatedBlocks.size(); ++i) {
+      LocatedBlock locatedBlock = locatedBlocks.get(i);
+      BlockReader reader = createBlockReader.apply(locatedBlock, conf);
+      if (i == 0) {
+        reader.skip(offset);
+      }
+      BlockInputStream inputStream = new BlockInputStream(reader);
+      inputStreams.add(inputStream);
+    }
+    Enumeration<InputStream> streamEnumeration = inputStreams.elements();
+    SequenceInputStream inputStream =
+        new SequenceInputStream(streamEnumeration);
+    MultipartUploader mpu = multipartUploaderSupplier.apply(fs);
+    PartHandle partHandle = mpu.putPart(filePath, inputStream,
+        partNumber, BBUploadHandle.from(ByteBuffer.wrap(uploadHandle)), 
length);
+    return new SyncTaskExecutionResult(partHandle.bytes(), length);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
new file mode 100644
index 0000000..cc5eb5c
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncReaderFactory.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FsTracer;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.ClientContext;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
+import org.apache.hadoop.hdfs.net.Peer;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.EnumSet;
+
+/**
+ * BlockSyncReaderFactory constructs a block reader in the Datanode for the
+ * Sync Command to read blocks that will be written to the synchronization
+ * remote endpoint.
+ */
+public class BlockSyncReaderFactory {
+
+  public static BlockReader createBlockReader(DataNode dataNode,
+      LocatedBlock locatedBlock, Configuration conf) throws IOException {
+    ClientContext clientContext = ClientContext.getFromConf(conf);
+    Token<BlockTokenIdentifier> accessToken = dataNode.getBlockAccessToken(
+        locatedBlock.getBlock(),
+        EnumSet.of(BlockTokenIdentifier.AccessMode.READ),
+        locatedBlock.getStorageTypes(), locatedBlock.getStorageIDs());
+
+    DatanodeInfo datanodeInfo = locatedBlock.getLocations()[0];
+
+    Socket socked = NetUtils.getDefaultSocketFactory(conf).createSocket();
+    InetSocketAddress resolvedAddress =
+        datanodeInfo.getResolvedAddress();
+    socked.connect(resolvedAddress);
+
+    return new BlockReaderFactory(new DfsClientConf(conf))
+        .setConfiguration(conf)
+        .setBlock(locatedBlock.getBlock())
+        .setBlockToken(accessToken)
+        .setStartOffset(0)
+        .setLength(locatedBlock.getBlock().getNumBytes())
+        .setInetSocketAddress(datanodeInfo.getResolvedAddress())
+        .setVerifyChecksum(true)
+        .setDatanodeInfo(datanodeInfo)
+        .setClientName("BlockSyncOperationExecutor")
+        .setCachingStrategy(CachingStrategy.newDefaultStrategy())
+        .setRemotePeerFactory((addr, blockToken, datanodeId) -> {
+          Peer peer = null;
+          Socket sock = NetUtils.getDefaultSocketFactory(conf).createSocket();
+          try {
+            sock.connect(addr, HdfsConstants.READ_TIMEOUT);
+            sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+            peer = DFSUtilClient.peerFromSocket(sock);
+          } finally {
+            if (peer == null) {
+              IOUtils.closeQuietly(sock);
+            }
+          }
+          return peer;
+        })
+        .setClientCacheContext(clientContext)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
new file mode 100644
index 0000000..660e39e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/BlockSyncTaskRunner.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTaskExecutionFeedback;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * BlockSyncTaskRunner glues together the sync task and the feedback reporting.
+ */
+import java.util.function.Consumer;
+
+public class BlockSyncTaskRunner implements Runnable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BlockSyncTaskRunner.class);
+
+  private BlockSyncTask blockSyncTask;
+  private BlockSyncOperationExecutor syncOperationExecutor;
+
+  private Consumer<BlockSyncTaskExecutionFeedback> publishOutcomeCallback;
+
+  public BlockSyncTaskRunner(BlockSyncTask blockSyncTask,
+      BlockSyncOperationExecutor syncOperationExecutor,
+      Consumer<BlockSyncTaskExecutionFeedback> publishOutcomeCallback) {
+    this.blockSyncTask = blockSyncTask;
+    this.syncOperationExecutor = syncOperationExecutor;
+    this.publishOutcomeCallback = publishOutcomeCallback;
+  }
+
+  @Override
+  public void run() {
+    LOG.info("Executing BlockyncTask {} (on {})",
+        blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI());
+    try {
+      SyncTaskExecutionResult result =
+          syncOperationExecutor.execute(blockSyncTask);
+      publishOutcomeCallback.accept(BlockSyncTaskExecutionFeedback
+          .finishedSuccessfully(blockSyncTask.getSyncTaskId(),
+              blockSyncTask.getSyncMountId(),
+              result));
+    } catch (Exception e) {
+      LOG.error(
+          String.format("Exception executing MetadataSyncTask %s (on %s)",
+              blockSyncTask.getSyncTaskId(), blockSyncTask.getRemoteURI()), e);
+      publishOutcomeCallback.accept(BlockSyncTaskExecutionFeedback
+          .failedWithException(blockSyncTask.getSyncTaskId(),
+              blockSyncTask.getSyncMountId(), e));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
new file mode 100644
index 0000000..43d4881
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockInputStream.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test the BlockInputStream facade.
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestBlockInputStream {
+  @Mock
+  private BlockReader blockReaderMock;
+
+  @Test
+  public void testBlockInputStreamReadChar() {
+    BlockInputStream is = new BlockInputStream(blockReaderMock);
+
+    try {
+      when(blockReaderMock.read(any(), eq(0), eq(1)))
+          .thenReturn(32);
+      // Making the mock perform the side effect of writing to buf is nasty.
+      is.read();
+      verify(blockReaderMock, times(1)).read(any(), eq(0), eq(1));
+    } catch (IOException e) {
+      fail("Could not even mock out read function.");
+    }
+  }
+
+  @Test
+  public void testBlockInputStreamReadBuf() {
+    BlockInputStream is = new BlockInputStream(blockReaderMock);
+
+    try {
+      byte[] buf = new byte[1024];
+      when(blockReaderMock.read(buf, 0, buf.length)).thenReturn(1024);
+      is.read(buf, 0, buf.length);
+      verify(blockReaderMock, times(1)).read(buf, 0, buf.length);
+    } catch (IOException e) {
+      fail("Could not even mock out read function.");
+    }
+  }
+
+  @Test
+  public void testBlockInputStreamSkip() {
+    BlockInputStream is = new BlockInputStream(blockReaderMock);
+
+    try {
+      when(blockReaderMock.skip(10)).thenReturn(10L);
+      long ret = is.skip(10);
+      assertEquals(10, ret);
+      verify(blockReaderMock, times(1)).skip(10L);
+    } catch (IOException e) {
+      fail("Could not even mock out skip function.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/959f49b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
new file mode 100644
index 0000000..e16d086
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/syncservice/executor/TestBlockSyncOperationExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.syncservice.executor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BBPartHandle;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.hdfs.BlockReader;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockSyncTask;
+import org.apache.hadoop.hdfs.server.protocol.SyncTaskExecutionResult;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestBlockSyncOperationExecutor {
+
+  @Mock
+  private BlockReader blockReaderMock;
+
+  @Mock
+  private MultipartUploader multipartUploaderMock;
+
+  @Test
+  public void executeMultipartPutFileSyncTask() throws Exception {
+    long blockLength = 42L;
+    Configuration conf = new Configuration();
+    BlockSyncOperationExecutor blockSyncOperationExecutor =
+        new BlockSyncOperationExecutor(conf,
+            (locatedBlock, config) -> blockReaderMock,
+            fs -> multipartUploaderMock);
+    String uploadHandleStr = "uploadHandle";
+    byte[] uploadHandle = uploadHandleStr.getBytes();
+    ByteBuffer byteBuffer = ByteBuffer.wrap(uploadHandle);
+    PartHandle partHandle = BBPartHandle.from(byteBuffer);
+    when(multipartUploaderMock.putPart(any(), any(), anyInt(), any(),
+        anyLong())).thenReturn(partHandle);
+    UUID syncTaskId = UUID.randomUUID();
+    URI remoteUri = new URI("remoteUri");
+    String syncMountId = "syncMountId";
+    Block block = new Block(42L, blockLength, 44L);
+    ExtendedBlock extendedBlock1 = new ExtendedBlock("poolId", block);
+    LocatedBlock locatedBlock = new LocatedBlock(extendedBlock1, null);
+    List<LocatedBlock> locatedBlocks = Lists.newArrayList(locatedBlock);
+    Integer partNumber = 85;
+    final int offset = 0;
+    final long length = locatedBlock.getBlockSize();
+
+
+    BlockSyncTask blockSyncTask = new BlockSyncTask(syncTaskId, remoteUri,
+        locatedBlocks, partNumber, uploadHandle, offset, length, syncMountId);
+
+    SyncTaskExecutionResult result =
+        blockSyncOperationExecutor.execute(blockSyncTask);
+
+    assertThat(result).isNotNull();
+    Long actualLength = result.getNumberOfBytes();
+    assertThat(actualLength).isEqualTo(blockLength);
+    assertThat(result.getResult()).isEqualTo(partHandle.bytes());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to