[
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671555#comment-15671555
]
ASF GitHub Bot commented on NIFI-2854:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1202#discussion_r88325963
--- Diff:
nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.repository.schema;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+
+public class SchemaRecordReader {
+ private final RecordSchema schema;
+
+ public SchemaRecordReader(final RecordSchema schema) {
+ this.schema = schema;
+ }
+
+ public static SchemaRecordReader fromSchema(final RecordSchema schema)
{
+ return new SchemaRecordReader(schema);
+ }
+
+ private static void fillBuffer(final InputStream in, final byte[]
destination) throws IOException {
+ int bytesRead = 0;
+ int len;
+ while (bytesRead < destination.length) {
+ len = in.read(destination, bytesRead, destination.length -
bytesRead);
+ if (len < 0) {
+ throw new EOFException();
+ }
+
+ bytesRead += len;
+ }
+ }
+
+ public Record readRecord(final InputStream in) throws IOException {
+ final int sentinelByte = in.read();
+ if (sentinelByte < 0) {
+ return null;
+ }
+
+ if (sentinelByte != 1) {
+ throw new IOException("Expected to read a Sentinel Byte of '1'
but got a value of '" + sentinelByte + "' instead");
+ }
+
+ final List<RecordField> schemaFields = schema.getFields();
+ final Map<RecordField, Object> fields = new
HashMap<>(schemaFields.size());
+
+ for (final RecordField field : schema.getFields()) {
+ final Object value = readField(in, field);
+ fields.put(field, value);
+ }
+
+ return new FieldMapRecord(fields, schema);
+ }
+
+
+ private Object readField(final InputStream in, final RecordField
field) throws IOException {
+ switch (field.getRepetition()) {
+ case ZERO_OR_MORE: {
+ // If repetition is 0+ then that means we have a list and
need to read how many items are in the list.
+ final int iterations = readInt(in);
+ if (iterations == 0) {
+ return Collections.emptyList();
+ }
+
+ final List<Object> value = new ArrayList<>(iterations);
+ for (int i = 0; i < iterations; i++) {
+ value.add(readFieldValue(in, field.getFieldType(),
field.getFieldName(), field.getSubFields()));
+ }
+
+ return value;
+ }
+ case ZERO_OR_ONE: {
+ // If repetition is 0 or 1 (optional), then check if next
byte is a 0, which means field is absent or 1, which means
+ // field is present. Otherwise, throw an Exception.
+ final int nextByte = in.read();
+ if (nextByte == -1) {
+ throw new EOFException("Unexpected End-of-File when
attempting to read Repetition value for field '" + field.getFieldName() + "'");
+ }
+ if (nextByte == 0) {
+ return null;
+ }
+ if (nextByte != 1) {
+ throw new IOException("Invalid Boolean value found
when reading 'Repetition' of field '" + field.getFieldName() + "'. Expected 0
or 1 but got " + (nextByte & 0xFF));
+ }
+ }
+ }
+
+ return readFieldValue(in, field.getFieldType(),
field.getFieldName(), field.getSubFields());
+ }
+
+
+ private Object readFieldValue(final InputStream in, final FieldType
fieldType, final String fieldName, final List<RecordField> subFields) throws
IOException {
+ switch (fieldType) {
+ case BOOLEAN: {
+ final DataInputStream dis = new DataInputStream(in);
+ return dis.readBoolean();
+ }
+ case INT: {
+ return readInt(in);
+ }
+ case LONG: {
+ final DataInputStream dis = new DataInputStream(in);
+ return dis.readLong();
+ }
+ case STRING: {
+ final DataInputStream dis = new DataInputStream(in);
+ return dis.readUTF();
+ }
+ case LONG_STRING: {
+ final int length = readInt(in);
+ final byte[] buffer = new byte[length];
+ fillBuffer(in, buffer);
+ return new String(buffer, StandardCharsets.UTF_8);
+ }
+ case BYTE_ARRAY: {
+ final int length = readInt(in);
+ final byte[] buffer = new byte[length];
+ fillBuffer(in, buffer);
+ return buffer;
+ }
+ case MAP: {
+ final int numEntries = readInt(in);
+ final RecordField keyField = subFields.get(0);
+ final RecordField valueField = subFields.get(1);
+
+ final Map<Object, Object> entries = new
HashMap<>(numEntries);
+ for (int i = 0; i < numEntries; i++) {
+ final Object key = readField(in, keyField);
+ final Object value = readField(in, valueField);
+ entries.put(key, value);
+ }
+
+ return entries;
+ }
+ case COMPLEX: {
+ final int numSubFields = subFields.size();
+ final Map<RecordField, Object> subFieldValues = new
HashMap<>(numSubFields);
+ for (int i = 0; i < numSubFields; i++) {
+ final Object subFieldValue = readField(in,
subFields.get(i));
+ subFieldValues.put(subFields.get(i), subFieldValue);
+ }
+
+ return new FieldMapRecord(subFieldValues, new
RecordSchema(subFields));
+ }
+ case UNION: {
+ final DataInputStream dis = new DataInputStream(in);
+ final String childFieldType = dis.readUTF();
+ final Optional<RecordField> fieldOption =
subFields.stream().filter(field ->
field.getFieldName().equals(childFieldType)).findFirst();
+ if (!fieldOption.isPresent()) {
+ throw new IOException("Found a field of type '" +
childFieldType + "' but that was not in the expected list of types");
+ }
+
+ final RecordField matchingField = fieldOption.get();
+ return readField(in, matchingField);
+ }
+ default: {
+ throw new IOException("Unrecognized Field Type " +
fieldType + " for field '" + fieldName + "'");
+ }
+ }
+ }
+
+ private int readInt(final InputStream in) throws IOException {
+ final byte[] buffer = new byte[4];
+ fillBuffer(in, buffer);
+
+ final int value = ((buffer[0] & 0xFF) << 24) +
+ ((buffer[1] & 0xFF) << 16) +
+ ((buffer[2] & 0xFF) << 8) +
+ (buffer[3] & 0xFF);
+
+ return value;
--- End diff --
That's a good call. Will update.
> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
> Key: NIFI-2854
> URL: https://issues.apache.org/jira/browse/NIFI-2854
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very
> important roll in NiFi's ability to be safely upgraded and rolled back. We
> need to have well documented behaviors, designs, and version adherence so
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which
> will break forward/backward compatibility and specifically this means that
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they
> should be reserved for major releases only and should include a utility to
> help users with pre-existing data convert from some older format to the newer
> format. It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to
> existing write ahead log record types but no fields can be removed nor can
> any new types be added. Once a field is considered required it must remain
> required. Changes may only be made across minor version changes - not
> incremental.
> * Swap File storage should follow very similar rules to the flow file
> repository. Adding a schema to the swap file header may allow some variation
> there but the variation should only be hints to optimize how they're
> processed and not change their behavior otherwise. Changes are only permitted
> during minor version releases.
> * Provenance repository changes are only permitted during minor version
> releases. These changes may include adding or removing fields from existing
> event types. If a field is considered required it must always be considered
> required. If a field is removed then it must not be a required field and
> there must be a sensible default an older version could use if that value is
> not found in new data once rolled back. New event types may be added.
> Fields or event types not known to older version, if seen after a rollback,
> will simply be ignored.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)