[
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15658579#comment-15658579
]
ASF GitHub Bot commented on NIFI-2854:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1202#discussion_r87674718
--- Diff:
nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/serialization/CompressableRecordReader.java
---
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance.serialization;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardProvenanceEventRecord;
+import org.apache.nifi.provenance.StandardRecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class CompressableRecordReader implements RecordReader {
+ private static final Logger logger =
LoggerFactory.getLogger(StandardRecordReader.class);
+
+ private final ByteCountingInputStream rawInputStream;
+ private final String filename;
+ private final int serializationVersion;
+ private final boolean compressed;
+ private final TocReader tocReader;
+ private final int headerLength;
+ private final int maxAttributeChars;
+
+ private DataInputStream dis;
+ private ByteCountingInputStream byteCountingIn;
+
+ public CompressableRecordReader(final InputStream in, final String
filename, final int maxAttributeChars) throws IOException {
+ this(in, filename, null, maxAttributeChars);
+ }
+
+ public CompressableRecordReader(final InputStream in, final String
filename, final TocReader tocReader, final int maxAttributeChars) throws
IOException {
+ logger.trace("Creating RecordReader for {}", filename);
+
+ rawInputStream = new ByteCountingInputStream(in);
+ this.maxAttributeChars = maxAttributeChars;
+
+ final InputStream limitedStream;
+ if (tocReader == null) {
+ limitedStream = rawInputStream;
+ } else {
+ final long offset1 = tocReader.getBlockOffset(1);
+ if (offset1 < 0) {
+ limitedStream = rawInputStream;
+ } else {
+ limitedStream = new LimitingInputStream(rawInputStream,
offset1 - rawInputStream.getBytesConsumed());
+ }
+ }
+
+ final InputStream readableStream;
+ if (filename.endsWith(".gz")) {
+ readableStream = new BufferedInputStream(new
GZIPInputStream(limitedStream));
+ compressed = true;
+ } else {
+ readableStream = new BufferedInputStream(limitedStream);
+ compressed = false;
+ }
+
+ byteCountingIn = new ByteCountingInputStream(readableStream);
+ dis = new DataInputStream(byteCountingIn);
+
+ final String repoClassName = dis.readUTF();
+ final int serializationVersion = dis.readInt();
+ headerLength =
repoClassName.getBytes(StandardCharsets.UTF_8).length + 2 + 4; // 2 bytes for
string length, 4 for integer.
+
+ if (serializationVersion < 1 || serializationVersion > 9) {
+ throw new IllegalArgumentException("Unable to deserialize
record because the version is " + serializationVersion + " and supported
versions are 1-9");
--- End diff --
Wouldn't this be version 10?
> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
> Key: NIFI-2854
> URL: https://issues.apache.org/jira/browse/NIFI-2854
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very
> important roll in NiFi's ability to be safely upgraded and rolled back. We
> need to have well documented behaviors, designs, and version adherence so
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which
> will break forward/backward compatibility and specifically this means that
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they
> should be reserved for major releases only and should include a utility to
> help users with pre-existing data convert from some older format to the newer
> format. It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to
> existing write ahead log record types but no fields can be removed nor can
> any new types be added. Once a field is considered required it must remain
> required. Changes may only be made across minor version changes - not
> incremental.
> * Swap File storage should follow very similar rules to the flow file
> repository. Adding a schema to the swap file header may allow some variation
> there but the variation should only be hints to optimize how they're
> processed and not change their behavior otherwise. Changes are only permitted
> during minor version releases.
> * Provenance repository changes are only permitted during minor version
> releases. These changes may include adding or removing fields from existing
> event types. If a field is considered required it must always be considered
> required. If a field is removed then it must not be a required field and
> there must be a sensible default an older version could use if that value is
> not found in new data once rolled back. New event types may be added.
> Fields or event types not known to older version, if seen after a rollback,
> will simply be ignored.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)