Repository: incubator-nifi Updated Branches: refs/heads/develop fed987e77 -> 05cc6f045
NIFI-212: When comparing .tar files for differences, rather than comparing MD5SUM's of .tar files, ensure that .tar files have the same number of entries and that those entries are identical. This prevents issues that arise if the TAR metadata within the file differs Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/05cc6f04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/05cc6f04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/05cc6f04 Branch: refs/heads/develop Commit: 05cc6f045d056e3be3f9c71dbc6f7400a4c01f08 Parents: fed987e Author: Mark Payne <[email protected]> Authored: Tue Dec 30 10:00:24 2014 -0500 Committer: Mark Payne <[email protected]> Committed: Tue Dec 30 10:00:24 2014 -0500 ---------------------------------------------------------------------- .../nifi/cluster/flow/impl/DataFlowDaoImpl.java | 63 +++++++++++++++++--- 1 file changed, 56 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05cc6f04/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java index b7b142b..72b594a 100644 --- a/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java +++ b/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/flow/impl/DataFlowDaoImpl.java @@ -24,6 +24,9 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.UUID; import javax.xml.bind.JAXBContext; @@ -40,6 +43,10 @@ import javax.xml.transform.TransformerFactory; import javax.xml.transform.dom.DOMSource; import javax.xml.transform.stream.StreamResult; +import org.apache.commons.compress.archivers.ArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; import org.apache.nifi.cluster.flow.ClusterDataFlow; import org.apache.nifi.cluster.flow.DaoException; import org.apache.nifi.cluster.flow.DataFlowDao; @@ -48,16 +55,12 @@ import org.apache.nifi.cluster.protocol.DataFlow; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.StandardDataFlow; import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter; -import org.apache.nifi.util.file.FileUtils; +import org.apache.nifi.logging.NiFiLog; import org.apache.nifi.stream.io.BufferedInputStream; import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.logging.NiFiLog; - -import org.apache.commons.compress.archivers.tar.TarArchiveEntry; -import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; -import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.w3c.dom.Document; @@ -174,14 +177,60 @@ public class DataFlowDaoImpl implements DataFlowDao { FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger); } else { // sync the primary copy with the restore copy - FileUtils.syncWithRestore(primaryFlowStateFile, restoreFlowStateFile, logger); + syncWithRestore(primaryFlowStateFile, restoreFlowStateFile); } } } catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) { throw new DaoException(ex); } + } + + + private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException { + try (final FileInputStream primaryFis = new FileInputStream(primaryFile); + final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis); + final FileInputStream restoreFis = new FileInputStream(restoreFile); + final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) { + + final ArchiveEntry primaryEntry = primaryIn.getNextEntry(); + final ArchiveEntry restoreEntry = restoreIn.getNextEntry(); + + if ( primaryEntry == null && restoreEntry == null ) { + return; + } + if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) { + throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", + primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); + } + + final byte[] primaryMd5 = calculateMd5(primaryIn); + final byte[] restoreMd5 = calculateMd5(restoreIn); + + if ( !Arrays.equals(primaryMd5, restoreMd5) ) { + throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'", + primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath())); + } + } + } + + private byte[] calculateMd5(final InputStream in) throws IOException { + final MessageDigest digest; + try { + digest = MessageDigest.getInstance("MD5"); + } catch (final NoSuchAlgorithmException nsae) { + throw new IOException(nsae); + } + + int len; + final byte[] buffer = new byte[8192]; + while ((len = in.read(buffer)) > -1) { + if (len > 0) { + digest.update(buffer, 0, len); + } + } + return digest.digest(); } @Override
