Repository: incubator-nifi
Updated Branches:
  refs/heads/develop fed987e77 -> 05cc6f045


NIFI-212: When comparing .tar files for differences, rather than comparing 
MD5SUM's of .tar files, ensure that .tar files have the same number of entries 
and that those entries are identical. This prevents issues that arise if the 
TAR metadata within the file differs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/05cc6f04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/05cc6f04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/05cc6f04

Branch: refs/heads/develop
Commit: 05cc6f045d056e3be3f9c71dbc6f7400a4c01f08
Parents: fed987e
Author: Mark Payne <[email protected]>
Authored: Tue Dec 30 10:00:24 2014 -0500
Committer: Mark Payne <[email protected]>
Committed: Tue Dec 30 10:00:24 2014 -0500

----------------------------------------------------------------------
 .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 63 +++++++++++++++++---
 1 file changed, 56 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05cc6f04/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
----------------------------------------------------------------------
diff --git 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
index b7b142b..72b594a 100644
--- 
a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
+++ 
b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java
@@ -24,6 +24,9 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
 import java.util.UUID;
 
 import javax.xml.bind.JAXBContext;
@@ -40,6 +43,10 @@ import javax.xml.transform.TransformerFactory;
 import javax.xml.transform.dom.DOMSource;
 import javax.xml.transform.stream.StreamResult;
 
+import org.apache.commons.compress.archivers.ArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.nifi.cluster.flow.ClusterDataFlow;
 import org.apache.nifi.cluster.flow.DaoException;
 import org.apache.nifi.cluster.flow.DataFlowDao;
@@ -48,16 +55,12 @@ import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.StandardDataFlow;
 import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.logging.NiFiLog;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.logging.NiFiLog;
-
-import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
-import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
-import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.w3c.dom.Document;
@@ -174,14 +177,60 @@ public class DataFlowDaoImpl implements DataFlowDao {
                     FileUtils.copyFile(restoreFlowStateFile, 
primaryFlowStateFile, false, false, logger);
                 } else {
                     // sync the primary copy with the restore copy
-                    FileUtils.syncWithRestore(primaryFlowStateFile, 
restoreFlowStateFile, logger);
+                    syncWithRestore(primaryFlowStateFile, 
restoreFlowStateFile);
                 }
 
             }
         } catch (final IOException | IllegalArgumentException | 
IllegalStateException | JAXBException ex) {
             throw new DaoException(ex);
         }
+    }
+    
+    
+    private void syncWithRestore(final File primaryFile, final File 
restoreFile) throws IOException {
+        try (final FileInputStream primaryFis = new 
FileInputStream(primaryFile);
+             final TarArchiveInputStream primaryIn = new 
TarArchiveInputStream(primaryFis);
+             final FileInputStream restoreFis = new 
FileInputStream(restoreFile);
+             final TarArchiveInputStream restoreIn = new 
TarArchiveInputStream(restoreFis)) {
+            
+            final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
+            final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
+
+            if ( primaryEntry == null && restoreEntry == null ) {
+                return;
+            }
 
+            if ( (primaryEntry == null && restoreEntry != null) || 
(primaryEntry != null && restoreEntry == null) ) {
+                throw new IllegalStateException(String.format("Primary file 
'%s' is different than restore file '%s'",
+                        primaryFile.getAbsoluteFile(), 
restoreFile.getAbsolutePath()));
+            }
+            
+            final byte[] primaryMd5 = calculateMd5(primaryIn);
+            final byte[] restoreMd5 = calculateMd5(restoreIn);
+            
+            if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
+                throw new IllegalStateException(String.format("Primary file 
'%s' is different than restore file '%s'",
+                        primaryFile.getAbsoluteFile(), 
restoreFile.getAbsolutePath()));
+            }
+        }
+    }
+    
+    private byte[] calculateMd5(final InputStream in) throws IOException {
+        final MessageDigest digest;
+        try {
+            digest = MessageDigest.getInstance("MD5");
+        } catch (final NoSuchAlgorithmException nsae) {
+            throw new IOException(nsae);
+        }
+        
+        int len;
+        final byte[] buffer = new byte[8192];
+        while ((len = in.read(buffer)) > -1) {
+            if (len > 0) {
+                digest.update(buffer, 0, len);
+            }
+        }
+        return digest.digest();
     }
 
     @Override

Reply via email to