Repository: nifi Updated Branches: refs/heads/master 141334c3c -> 0f2ac39f6
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java new file mode 100644 index 0000000..7e87199 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +public class StandardResourceClaim implements ResourceClaim, Comparable<ResourceClaim> { + private final StandardResourceClaimManager claimManager; + private final String id; + private final String container; + private final String section; + private final boolean lossTolerant; + private final int hashCode; + private volatile boolean writable = true; + + public StandardResourceClaim(final StandardResourceClaimManager claimManager, final String container, final String section, final String id, final boolean lossTolerant) { + this.claimManager = claimManager; + this.container = container.intern(); + this.section = section.intern(); + this.id = id; + this.lossTolerant = lossTolerant; + + hashCode = 17 + 19 * id.hashCode() + 19 * container.hashCode() + 19 * section.hashCode(); + } + + @Override + public boolean isLossTolerant() { + return lossTolerant; + } + + /** + * @return the unique identifier for this claim + */ + @Override + public String getId() { + return id; + } + + /** + * @return the container identifier in which this claim is held + */ + @Override + public String getContainer() { + return container; + } + + /** + * @return the section within a given container the claim is held + */ + @Override + public String getSection() { + return section; + } + + @Override + public boolean equals(final Object other) { + if (this == other) { + return true; + } + + if (other == null) { + return false; + } + if (hashCode != other.hashCode()) { + // We check hash code before instanceof because instanceof is fairly expensive and for + // StandardResourceClaim, calling hashCode() simply returns a pre-calculated value. + return false; + } + + if (!(other instanceof ResourceClaim)) { + return false; + } + final ResourceClaim otherClaim = (ResourceClaim) other; + return id.equals(otherClaim.getId()) && container.equals(otherClaim.getContainer()) && section.equals(otherClaim.getSection()); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return "StandardResourceClaim[id=" + id + ", container=" + container + ", section=" + section + "]"; + } + + @Override + public boolean isWritable() { + return writable; + } + + /** + * Freeze the Resource Claim so that it can now longer be written to + */ + void freeze() { + this.writable = false; + } + + @Override + public boolean isInUse() { + // Note that it is critical here that we always check isWritable() BEFORE checking + // the claimant count. This is due to the fact that if the claim is in fact writable, the claimant count + // could increase. So if we first check claimant count and that is 0, and then we check isWritable, it may be + // that the claimant count has changed to 1 before checking isWritable. + // However, if isWritable() is false, then the only way that the claimant count can increase is if a FlowFile referencing + // the Resource Claim is cloned. In this case, though, the claimant count has not become 0. + // Said another way, if isWritable() == false, then the claimant count can never increase from 0. + return isWritable() || claimManager.getClaimantCount(this) > 0; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java new file mode 100644 index 0000000..e4f060e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository.claim; + +import java.util.Collection; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class StandardResourceClaimManager implements ResourceClaimManager { + + private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); + private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); + private final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); + + @Override + public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) { + final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant); + if (!writable) { + claim.freeze(); + } + return claim; + } + + @Override + public ResourceClaim getResourceClaim(final String container, final String section, final String id) { + final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false); + final ClaimCount count = claimantCounts.get(tempClaim); + return (count == null) ? null : count.getClaim(); + } + + private AtomicInteger getCounter(final ResourceClaim claim) { + if (claim == null) { + return null; + } + + ClaimCount counter = claimantCounts.get(claim); + if (counter != null) { + return counter.getCount(); + } + + counter = new ClaimCount(claim, new AtomicInteger(0)); + final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter); + return existingCounter == null ? counter.getCount() : existingCounter.getCount(); + } + + @Override + public int getClaimantCount(final ResourceClaim claim) { + if (claim == null) { + return 0; + } + + synchronized (claim) { + final ClaimCount counter = claimantCounts.get(claim); + return counter == null ? 0 : counter.getCount().get(); + } + } + + @Override + public int decrementClaimantCount(final ResourceClaim claim) { + if (claim == null) { + return 0; + } + + synchronized (claim) { + final ClaimCount counter = claimantCounts.get(claim); + if (counter == null) { + logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); + return -1; + } + + final int newClaimantCount = counter.getCount().decrementAndGet(); + if (newClaimantCount < 0) { + logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount); + } else { + logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); + } + + // If the claim is no longer referenced, we want to remove it. We consider the claim to be "no longer referenced" + // if the count is 0 and it is no longer writable (if it's writable, it may still be writable by the Content Repository, + // even though no existing FlowFile is referencing the claim). + if (newClaimantCount == 0 && !claim.isWritable()) { + removeClaimantCount(claim); + } + return newClaimantCount; + } + } + + // protected so that it can be used in unit tests + protected void removeClaimantCount(final ResourceClaim claim) { + claimantCounts.remove(claim); + } + + @Override + public int incrementClaimantCount(final ResourceClaim claim) { + return incrementClaimantCount(claim, false); + } + + @Override + public int incrementClaimantCount(final ResourceClaim claim, final boolean newClaim) { + if (claim == null) { + return 0; + } + + synchronized (claim) { + final AtomicInteger counter = getCounter(claim); + + final int newClaimantCount = counter.incrementAndGet(); + logger.debug("Incrementing claimant count for {} to {}", claim, newClaimantCount); + + // If the claimant count moved from 0 to 1, remove it from the queue of destructable claims. + if (!newClaim && newClaimantCount == 1) { + destructableClaims.remove(claim); + } + return newClaimantCount; + } + } + + @Override + public void markDestructable(final ResourceClaim claim) { + if (claim == null) { + return; + } + + synchronized (claim) { + if (getClaimantCount(claim) > 0) { + return; + } + + logger.debug("Marking claim {} as destructable", claim); + try { + while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { + } + } catch (final InterruptedException ie) { + } + } + } + + @Override + public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements) { + final int drainedCount = destructableClaims.drainTo(destination, maxElements); + logger.debug("Drained {} destructable claims to {}", drainedCount, destination); + } + + @Override + public void drainDestructableClaims(final Collection<ResourceClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) { + try { + final ResourceClaim firstClaim = destructableClaims.poll(timeout, unit); + if (firstClaim != null) { + destination.add(firstClaim); + destructableClaims.drainTo(destination, maxElements - 1); + } + } catch (final InterruptedException e) { + } + } + + @Override + public void purge() { + claimantCounts.clear(); + } + + @Override + public void freeze(final ResourceClaim claim) { + if (claim == null) { + return; + } + + if (!(claim instanceof StandardResourceClaim)) { + throw new IllegalArgumentException("The given resource claim is not managed by this Resource Claim Manager"); + } + + ((StandardResourceClaim) claim).freeze(); + + synchronized (claim) { + if (getClaimantCount(claim) == 0) { + claimantCounts.remove(claim); + } + } + } + + + private static final class ClaimCount { + private final ResourceClaim claim; + private final AtomicInteger count; + + public ClaimCount(final ResourceClaim claim, final AtomicInteger count) { + this.claim = claim; + this.count = count; + } + + public AtomicInteger getCount() { + return count; + } + + public ResourceClaim getClaim() { + return claim; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml index 45494d4..e185332 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/pom.xml @@ -29,6 +29,8 @@ <module>nifi-runtime</module> <module>nifi-security</module> <module>nifi-site-to-site</module> + <module>nifi-repository-models</module> + <module>nifi-flowfile-repo-serialization</module> <module>nifi-framework-core</module> <module>nifi-framework-cluster-protocol</module> <module>nifi-framework-cluster</module> http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml b/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml new file mode 100644 index 0000000..946b195 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml @@ -0,0 +1,27 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-toolkit</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-toolkit-flowfile-repo</artifactId> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java new file mode 100644 index 0000000..d911a9d --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/main/java/org/apache/nifi/toolkit/repos/flowfile/RepairCorruptedFileEndings.java @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.repos.flowfile; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.RandomAccessFile; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.StreamUtils; + +public class RepairCorruptedFileEndings { + private static final Pattern PARTITION_FILE_PATTERN = Pattern.compile("partition\\-\\d+"); + + private static void printUsage() { + System.out.println("Whenever a sudden power loss occurs, it is common with some operating systems for files that are being written to "); + System.out.println("to contain many NUL characters (hex 0) at the end of the file upon restart. If this happens to the FlowFile repository, "); + System.out.println("NiFi will be unable to recover, because it cannot properly read the repository. This utility attempts to read the FlowFile "); + System.out.println("Repository and write out a new copy of the repository, where the new copy does not contain the trailing NUL characters so "); + System.out.println("NiFi can be restarted by pointing at the new FlowFile Repository."); + System.out.println("Typically, this problem can be identified by seeing an error in the NiFi logs at startup, indicating either:"); + System.out.println(); + System.out.println("Caused by: java.io.IOException: Expected to read a Sentinel Byte of '1' but got a value of '0' instead"); + System.out.println(); + System.out.println("or:"); + System.out.println(); + System.out.println("Caused by: java.lang.IllegalArgumentException: No enum constant org.wali.UpdateType."); + System.out.println(); + System.out.println(); + System.out.println("Usage:"); + System.out.println("java " + RepairCorruptedFileEndings.class.getCanonicalName() + " <repo input directory> <repo destination directory>"); + System.out.println(); + System.out.println("<repo input directory>: The existing FlowFile Repository Directory that contains corrupt data"); + System.out.println("<repo destination directory>: The directory to write the repaired repository to"); + System.out.println(); + } + + public static void main(final String[] args) { + if (args.length != 2) { + printUsage(); + return; + } + + final File inputDir = new File(args[0]); + if (!inputDir.exists()) { + System.out.println("Input Repository Directory " + inputDir + " does not exist"); + return; + } + + final File[] inputFiles = inputDir.listFiles(); + if (inputFiles == null) { + System.out.println("Could not access files within input Repository Directory " + inputDir); + return; + } + + final List<File> partitionDirs = Stream.of(inputFiles) + .filter(RepairCorruptedFileEndings::isPartitionDirectory) + .collect(Collectors.toList()); + + if (partitionDirs.isEmpty()) { + System.out.println("Found no partitions within input Repository Directory " + inputDir); + return; + } + + final File outputDir = new File(args[1]); + if (outputDir.exists()) { + final File[] children = outputDir.listFiles(); + if (children == null) { + System.out.println("Cannot access output Repository Directory " + outputDir); + return; + } + + if (children.length > 0) { + System.out.println("Output Repository Directory " + outputDir + " already exists and has files or sub-directories. " + + "The output directory must either not exist or be empty."); + return; + } + } else if (!outputDir.mkdirs()) { + System.out.println("Failed to create output Repository Directory " + outputDir); + return; + } + + final List<File> nonPartitionDirFiles = Stream.of(inputFiles) + .filter(f -> !isPartitionDirectory(f)) + .filter(f -> !f.getName().equals("wali.lock")) + .collect(Collectors.toList()); + + for (final File nonPartitionFile : nonPartitionDirFiles) { + final File destination = new File(outputDir, nonPartitionFile.getName()); + try { + copy(nonPartitionFile, destination); + } catch (final IOException e) { + System.out.println("Failed to copy source file " + nonPartitionFile + " to destination file " + destination); + e.printStackTrace(); + } + } + + int fullCopies = 0; + int partialCopies = 0; + + for (final File partitionDir : partitionDirs) { + final File[] partitionFiles = partitionDir.listFiles(); + if (partitionFiles == null) { + System.out.println("Could not access children of input sub-directory " + partitionDir); + return; + } + + final File outputPartitionDir = new File(outputDir, partitionDir.getName()); + if (!outputPartitionDir.mkdirs()) { + System.out.println("Failed to created output directory " + outputPartitionDir); + return; + } + + for (final File partitionFile : partitionFiles) { + final File destinationFile = new File(outputPartitionDir, partitionFile.getName()); + + // All journal files follow the pattern of: + // <journal entry> <TRANSACTION_CONTINUE | TRANSACTION_COMMIT> <journal entry> <TRANSACTION_CONTINUE | TRANSACTION_COMMIT> ... + // The TRANSACTION_CONTINUE byte is a 1 while the TRANSACTION_COMMIT byte is a 2. So if we have 0's at the end then we know + // that we can simply truncate up until the point where we encounter the first of the of the trailing zeroes. At that point, + // we know that we are done. It is possible that the repo will still be 'corrupt' in that only part of a transaction was + // written out. However, this is okay because the repo will recover from this on restart. What it does NOT properly recover + // from on restart is when the file ends with a bunch of 0's because it believes that the Transaction ID is zero and then + // it reads in 0 bytes for the "Update Type" and as a result we get an invalid enum name because it thinks that the name of + // the UpdateType is an empty string because it's a string of length 0. + final int trailingZeroes; + try { + trailingZeroes = countTrailingZeroes(partitionFile); + } catch (final Exception e) { + System.out.println("Failed to read input file " + partitionFile); + e.printStackTrace(); + return; + } + + if (trailingZeroes > 0) { + final long goodLength = partitionFile.length() - trailingZeroes; + + try { + copy(partitionFile, destinationFile, goodLength); + partialCopies++; + } catch (final Exception e) { + System.out.println("Failed to copy " + goodLength + " bytes from " + partitionFile + " to " + destinationFile); + e.printStackTrace(); + return; + } + } else { + try { + copy(partitionFile, destinationFile); + } catch (final Exception e) { + System.out.println("Failed to copy entire file from " + partitionFile + " to " + destinationFile); + e.printStackTrace(); + return; + } + + fullCopies++; + } + } + } + + System.out.println("Successfully copied " + fullCopies + " journal files fully and truncated " + partialCopies + " journal files in output directory"); + } + + private static boolean isPartitionDirectory(final File file) { + return PARTITION_FILE_PATTERN.matcher(file.getName()).matches(); + } + + private static void copy(final File input, final File destination) throws IOException { + if (input.isFile()) { + copyFile(input, destination); + return; + } else { + copyDirectory(input, destination); + } + } + + private static void copyDirectory(final File input, final File destination) throws IOException { + if (!destination.exists() && !destination.mkdirs()) { + System.out.println("Failed to copy input directory " + input + " to destination because destination directory " + destination + + " does not exist and could not be created"); + return; + } + + final File[] children = input.listFiles(); + if (children == null) { + System.out.println("Failed to copy input directory " + input + " to destination because could not access files of input directory"); + return; + } + + for (final File child : children) { + final File destinationChild = new File(destination, child.getName()); + copy(child, destinationChild); + } + } + + private static void copyFile(final File input, final File destination) throws IOException { + if (!input.exists()) { + return; + } + + Files.copy(input.toPath(), destination.toPath(), StandardCopyOption.COPY_ATTRIBUTES); + } + + private static void copy(final File input, final File destination, final long length) throws IOException { + try (final InputStream fis = new FileInputStream(input); + final LimitingInputStream in = new LimitingInputStream(fis, length); + final OutputStream fos = new FileOutputStream(destination)) { + StreamUtils.copy(in, fos); + } + } + + static int countTrailingZeroes(final File partitionFile) throws IOException { + final RandomAccessFile raf = new RandomAccessFile(partitionFile, "r"); + + long startPos = partitionFile.length() - 4096; + + int count = 0; + boolean reachedStartOfFile = false; + while (!reachedStartOfFile) { + int bufferLength = 4096; + + if (startPos < 0) { + bufferLength = (int) (startPos + 4096); + startPos = 0; + reachedStartOfFile = true; + } + + raf.seek(startPos); + + final byte[] buffer = new byte[bufferLength]; + final int read = fillBuffer(raf, buffer); + + for (int i = read - 1; i >= 0; i--) { + final byte b = buffer[i]; + if (b == 0) { + count++; + } else { + return count; + } + } + + startPos -= 4096; + } + + return count; + } + + + private static int fillBuffer(final RandomAccessFile source, final byte[] destination) throws IOException { + int bytesRead = 0; + int len; + while (bytesRead < destination.length) { + len = source.read(destination, bytesRead, destination.length - bytesRead); + if (len < 0) { + break; + } + + bytesRead += len; + } + + return bytesRead; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java ---------------------------------------------------------------------- diff --git a/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java new file mode 100644 index 0000000..7f8a7a1 --- /dev/null +++ b/nifi-toolkit/nifi-toolkit-flowfile-repo/src/test/java/org/apache/nifi/toolkit/repos/flowfile/TestRepairCorruptedFileEndings.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.toolkit.repos.flowfile; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardOpenOption; + +import org.junit.Test; + +public class TestRepairCorruptedFileEndings { + private final File targetFile = new File("target/1.bin"); + + @Before + @After + public void cleanup() { + if (targetFile.exists()) { + Assert.assertTrue(targetFile.delete()); + } + } + + @Test + public void testEndsWithZeroesGreaterThanBufferSize() throws IOException { + final byte[] data = new byte[4096 + 8]; + for (int i=0; i < 4096; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(8, zeroCount); + } + + @Test + public void testEndsWithZeroesSmallerThanBufferSize() throws IOException { + final byte[] data = new byte[1024]; + for (int i = 0; i < 1020; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4, zeroCount); + } + + @Test + public void testEndsWithZeroesEqualToBufferSize() throws IOException { + final byte[] data = new byte[4096]; + for (int i = 0; i < 4090; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(6, zeroCount); + } + + + @Test + public void testAllZeroesGreaterThanBufferSize() throws IOException { + final byte[] data = new byte[4096 + 8]; + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4096 + 8, zeroCount); + } + + @Test + public void testAllZeroesEqualToBufferSize() throws IOException { + final byte[] data = new byte[4096]; + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4096, zeroCount); + } + + @Test + public void testAllZeroesSmallerThanBufferSize() throws IOException { + final byte[] data = new byte[1024]; + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(1024, zeroCount); + } + + + @Test + public void testSmallerThanBufferSize() throws IOException { + final byte[] data = new byte[1024]; + for (int i = 0; i < 1020; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(4, zeroCount); + } + + @Test + public void testSmallerThanBufferSizeNoTrailingZeroes() throws IOException { + final byte[] data = new byte[1024]; + for (int i = 0; i < 1024; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(0, zeroCount); + } + + + @Test + public void testLargerThanBufferSizeNoTrailingZeroes() throws IOException { + final byte[] data = new byte[8192]; + for (int i = 0; i < 8192; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(0, zeroCount); + } + + + @Test + public void testEqualToBufferSizeNoTrailingZeroes() throws IOException { + final byte[] data = new byte[4096]; + for (int i = 0; i < 4096; i++) { + data[i] = 'A'; + } + + Files.write(targetFile.toPath(), data, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING); + + final int zeroCount = RepairCorruptedFileEndings.countTrailingZeroes(targetFile); + assertEquals(0, zeroCount); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-toolkit/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-toolkit/pom.xml b/nifi-toolkit/pom.xml index b98325d..75661c7 100644 --- a/nifi-toolkit/pom.xml +++ b/nifi-toolkit/pom.xml @@ -27,6 +27,7 @@ <module>nifi-toolkit-encrypt-config</module> <module>nifi-toolkit-s2s</module> <module>nifi-toolkit-zookeeper-migrator</module> + <module>nifi-toolkit-flowfile-repo</module> <module>nifi-toolkit-assembly</module> </modules> <dependencyManagement> http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 660977d..a735e13 100644 --- a/pom.xml +++ b/pom.xml @@ -1,15 +1,16 @@ <?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor -license agreements. See the NOTICE file distributed with this work for additional -information regarding copyright ownership. The ASF licenses this file to -You under the Apache License, Version 2.0 (the "License"); you may not use -this file except in compliance with the License. You may obtain a copy of -the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required -by applicable law or agreed to in writing, software distributed under the -License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS -OF ANY KIND, either express or implied. See the License for the specific -language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache</groupId> @@ -87,7 +88,7 @@ language governing permissions and limitations under the License. --> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.min-version>3.1.0</maven.min-version> - <maven.surefire.arguments/> + <maven.surefire.arguments /> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <inceptionYear>2014</inceptionYear> @@ -330,9 +331,9 @@ language governing permissions and limitations under the License. --> <artifactId>quartz</artifactId> <version>2.2.1</version> <exclusions> - <!-- | Exclude the quartz 2.2.1 bundled version of c3p0 because it is - lgpl licensed | We also don't use the JDBC related features of quartz for - which the dependency would matter --> + <!-- | Exclude the quartz 2.2.1 bundled version of c3p0 + because it is lgpl licensed | We also don't use the JDBC related features + of quartz for which the dependency would matter --> <exclusion> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> @@ -401,8 +402,8 @@ language governing permissions and limitations under the License. --> <artifactId>spring-core</artifactId> <version>${spring.version}</version> <exclusions> - <!-- <artifactId>jcl-over-slf4j</artifactId> is used in dependencies - section --> + <!-- <artifactId>jcl-over-slf4j</artifactId> is used + in dependencies section --> <exclusion> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> @@ -818,7 +819,7 @@ language governing permissions and limitations under the License. --> <version>${org.slf4j.version}</version> </dependency> - + <!-- Test Dependencies for testing interactions with ZooKeeper --> <dependency> <groupId>org.apache.curator</groupId> @@ -832,9 +833,9 @@ language governing permissions and limitations under the License. --> <version>6.8.8</version> <scope>test</scope> </dependency> - - - <dependency> + + + <dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.8.3</version> @@ -872,6 +873,16 @@ language governing permissions and limitations under the License. --> </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flowfile-repo-serialization</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-repository-models</artifactId> + <version>1.2.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-custom-ui-utilities</artifactId> <version>1.2.0-SNAPSHOT</version> </dependency> @@ -1253,37 +1264,37 @@ language governing permissions and limitations under the License. --> <artifactId>nifi-elasticsearch-nar</artifactId> <version>1.2.0-SNAPSHOT</version> <type>nar</type> - </dependency> - <dependency> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-elasticsearch-5-nar</artifactId> - <version>1.2.0-SNAPSHOT</version> + <version>1.2.0-SNAPSHOT</version> <type>nar</type> - </dependency> - <dependency> + </dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-lumberjack-nar</artifactId> <version>1.2.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-beats-nar</artifactId> <version>1.2.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-email-nar</artifactId> <version>1.2.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-tcp-nar</artifactId> - <version>1.2.0-SNAPSHOT</version> - <type>nar</type> - </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-tcp-nar</artifactId> + <version>1.2.0-SNAPSHOT</version> + <type>nar</type> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-splunk-nar</artifactId> @@ -1308,7 +1319,7 @@ language governing permissions and limitations under the License. --> <version>1.2.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-site-to-site-reporting-nar</artifactId> <version>1.2.0-SNAPSHOT</version> @@ -1600,7 +1611,9 @@ language governing permissions and limitations under the License. --> <include>**/*Spec.class</include> </includes> <redirectTestOutputToFile>true</redirectTestOutputToFile> - <argLine combine.children="append">-Xmx1G -Djava.net.preferIPv4Stack=true ${maven.surefire.arguments}</argLine> + <argLine combine.children="append">-Xmx1G + -Djava.net.preferIPv4Stack=true + ${maven.surefire.arguments}</argLine> </configuration> <dependencies> <dependency> @@ -1804,71 +1817,100 @@ language governing permissions and limitations under the License. --> <!-- Checks for whitespace --> <!-- See http://checkstyle.sf.net/config_whitespace.html --> <module name="FileTabCharacter"> - <property name="eachLine" value="true" /> + <property name="eachLine" + value="true" /> </module> <module name="TreeWalker"> <module name="RegexpSinglelineJava"> - <property name="format" value="\s+$" /> - <property name="message" value="Line has trailing whitespace." /> + <property name="format" + value="\s+$" /> + <property name="message" + value="Line has trailing whitespace." /> </module> <module name="RegexpSinglelineJava"> - <property name="format" value="[@]see\s+[{][@]link" /> - <property name="message" value="Javadoc @see does not need @link: pick one or the other." /> + <property name="format" + value="[@]see\s+[{][@]link" /> + <property name="message" + value="Javadoc @see does not need @link: pick one or the other." /> </module> <module name="OuterTypeFilename" /> <module name="LineLength"> - <!-- needs extra, because Eclipse formatter ignores the ending left - brace --> + <!-- needs extra, because Eclipse formatter + ignores the ending left brace --> <property name="max" value="200" /> - <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" /> + <property name="ignorePattern" + value="^package.*|^import.*|a href|href|http://|https://|ftp://" /> </module> <module name="AvoidStarImport" /> <module name="UnusedImports"> - <property name="processJavadoc" value="true" /> + <property name="processJavadoc" + value="true" /> </module> <module name="NoLineWrap" /> <module name="LeftCurly"> - <property name="maxLineLength" value="160" /> + <property name="maxLineLength" + value="160" /> </module> <module name="RightCurly" /> <module name="RightCurly"> - <property name="option" value="alone" /> - <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" /> + <property name="option" + value="alone" /> + <property name="tokens" + value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" /> </module> <module name="SeparatorWrap"> - <property name="tokens" value="DOT" /> - <property name="option" value="nl" /> + <property name="tokens" + value="DOT" /> + <property name="option" + value="nl" /> </module> <module name="SeparatorWrap"> - <property name="tokens" value="COMMA" /> - <property name="option" value="EOL" /> + <property name="tokens" + value="COMMA" /> + <property name="option" + value="EOL" /> </module> <module name="PackageName"> - <property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" /> + <property name="format" + value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" /> </module> <module name="MethodTypeParameterName"> - <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" /> + <property name="format" + value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" /> </module> <module name="MethodParamPad" /> <module name="OperatorWrap"> - <property name="option" value="NL" /> - <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " /> + <property name="option" + value="NL" /> + <property name="tokens" + value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " /> </module> <module name="AnnotationLocation"> - <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" /> + <property name="tokens" + value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" /> </module> <module name="AnnotationLocation"> - <property name="tokens" value="VARIABLE_DEF" /> - <property name="allowSamelineMultipleAnnotations" value="true" /> + <property name="tokens" + value="VARIABLE_DEF" /> + <property + name="allowSamelineMultipleAnnotations" + value="true" /> </module> <module name="NonEmptyAtclauseDescription" /> <module name="JavadocMethod"> - <property name="allowMissingJavadoc" value="true" /> - <property name="allowMissingParamTags" value="true" /> - <property name="allowMissingThrowsTags" value="true" /> - <property name="allowMissingReturnTag" value="true" /> - <property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" /> - <property name="allowThrowsTagsForSubclasses" value="true" /> + <property name="allowMissingJavadoc" + value="true" /> + <property name="allowMissingParamTags" + value="true" /> + <property name="allowMissingThrowsTags" + value="true" /> + <property name="allowMissingReturnTag" + value="true" /> + <property name="allowedAnnotations" + value="Override,Test,BeforeClass,AfterClass,Before,After" /> + <property + name="allowThrowsTagsForSubclasses" + value="true" /> </module> <module name="SingleLineJavadoc" /> </module> @@ -1908,9 +1950,10 @@ language governing permissions and limitations under the License. --> </build> <profiles> <profile> - <!-- Performs execution of Integration Tests using the Maven FailSafe Plugin. The view of integration tests in this context - are those tests interfacing with external sources and services requiring additional resources or credentials that cannot - be explicitly provided. --> + <!-- Performs execution of Integration Tests using the Maven + FailSafe Plugin. The view of integration tests in this context are those + tests interfacing with external sources and services requiring additional + resources or credentials that cannot be explicitly provided. --> <id>integration-tests</id> <build> <plugins> @@ -1930,12 +1973,12 @@ language governing permissions and limitations under the License. --> </build> </profile> <profile> - <!-- Checks style and licensing requirements. This is a good idea to run - for contributions and for the release process. While it would be nice to - run always these plugins can considerably slow the build and have proven - to create unstable builds in our multi-module project and when building using - multiple threads. The stability issues seen with Checkstyle in multi-module - builds include false-positives and false negatives. --> + <!-- Checks style and licensing requirements. This is a good + idea to run for contributions and for the release process. While it would + be nice to run always these plugins can considerably slow the build and have + proven to create unstable builds in our multi-module project and when building + using multiple threads. The stability issues seen with Checkstyle in multi-module + builds include false-positives and false negatives. --> <id>contrib-check</id> <build> <plugins> @@ -1991,14 +2034,16 @@ language governing permissions and limitations under the License. --> </pluginManagement> </build> </profile> - <!-- The following profiles are here as a convenience for folks that want to build against vendor-specific - distributions of the various Hadoop ecosystem libraries. These will alter which dependencies are sourced - in a manner that can adjust the correct LICENSE and NOTICE requirements for any affected jar and the - resulting assembly overall. These L&N impacts are not automatically handled by the build process and are - the responsibility of those creating and using the resulting binary artifacts. --> + <!-- The following profiles are here as a convenience for folks that + want to build against vendor-specific distributions of the various Hadoop + ecosystem libraries. These will alter which dependencies are sourced in a + manner that can adjust the correct LICENSE and NOTICE requirements for any + affected jar and the resulting assembly overall. These L&N impacts are not + automatically handled by the build process and are the responsibility of + those creating and using the resulting binary artifacts. --> <profile> - <!-- This profile adds the Hortonworks repository for resolving Hortonworks Data Platform (HDP) - artifacts for the Hadoop bundles --> + <!-- This profile adds the Hortonworks repository for resolving + Hortonworks Data Platform (HDP) artifacts for the Hadoop bundles --> <id>hortonworks</id> <repositories> <repository> @@ -2033,15 +2078,13 @@ language governing permissions and limitations under the License. --> </repository> </repositories> <properties> - <!-- Vendor-specific version number included here as default, should be overridden on the - command-line - <hadoop.version>2.7.1.2.4.0.0-169</hadoop.version> - --> + <!-- Vendor-specific version number included here as default, + should be overridden on the command-line <hadoop.version>2.7.1.2.4.0.0-169</hadoop.version> --> </properties> </profile> <profile> - <!-- This profile will add the MapR repository for resolving MapR Hadoop - artifacts for the Hadoop bundles --> + <!-- This profile will add the MapR repository for resolving + MapR Hadoop artifacts for the Hadoop bundles --> <id>mapr</id> <repositories> <repository> @@ -2057,15 +2100,13 @@ language governing permissions and limitations under the License. --> </repository> </repositories> <properties> - <!-- Vendor-specific version number included here as default, should be overridden on the - command-line - <hadoop.version>2.7.0-mapr-1602</hadoop.version> - --> + <!-- Vendor-specific version number included here as default, + should be overridden on the command-line <hadoop.version>2.7.0-mapr-1602</hadoop.version> --> </properties> </profile> <profile> - <!-- This profile will add the Cloudera repository for resolving Cloudera Distribution of Hadoop (CDH) - artifacts for the Hadoop bundles --> + <!-- This profile will add the Cloudera repository for resolving + Cloudera Distribution of Hadoop (CDH) artifacts for the Hadoop bundles --> <id>cloudera</id> <repositories> <repository> @@ -2081,10 +2122,8 @@ language governing permissions and limitations under the License. --> </repository> </repositories> <properties> - <!-- Vendor-specific version number included here as default, should be overridden on the - command-line - <hadoop.version>2.6.0-cdh5.8.1</hadoop.version> - --> + <!-- Vendor-specific version number included here as default, + should be overridden on the command-line <hadoop.version>2.6.0-cdh5.8.1</hadoop.version> --> </properties> </profile> </profiles>
