[ 
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671278#comment-15671278
 ] 

ASF GitHub Bot commented on NIFI-2854:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1202#discussion_r88304118
  
    --- Diff: 
nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/SchemaRecordReader.java
 ---
    @@ -0,0 +1,196 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.repository.schema;
    +
    +import java.io.DataInputStream;
    +import java.io.EOFException;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.nio.charset.StandardCharsets;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +
    +public class SchemaRecordReader {
    +    private final RecordSchema schema;
    +
    +    public SchemaRecordReader(final RecordSchema schema) {
    +        this.schema = schema;
    +    }
    +
    +    public static SchemaRecordReader fromSchema(final RecordSchema schema) 
{
    +        return new SchemaRecordReader(schema);
    +    }
    +
    +    private static void fillBuffer(final InputStream in, final byte[] 
destination) throws IOException {
    +        int bytesRead = 0;
    +        int len;
    +        while (bytesRead < destination.length) {
    +            len = in.read(destination, bytesRead, destination.length - 
bytesRead);
    +            if (len < 0) {
    +                throw new EOFException();
    +            }
    +
    +            bytesRead += len;
    +        }
    +    }
    +
    +    public Record readRecord(final InputStream in) throws IOException {
    +        final int sentinelByte = in.read();
    +        if (sentinelByte < 0) {
    +            return null;
    +        }
    +
    +        if (sentinelByte != 1) {
    +            throw new IOException("Expected to read a Sentinel Byte of '1' 
but got a value of '" + sentinelByte + "' instead");
    +        }
    +
    +        final List<RecordField> schemaFields = schema.getFields();
    +        final Map<RecordField, Object> fields = new 
HashMap<>(schemaFields.size());
    +
    +        for (final RecordField field : schema.getFields()) {
    +            final Object value = readField(in, field);
    +            fields.put(field, value);
    +        }
    +
    +        return new FieldMapRecord(fields, schema);
    +    }
    +
    +
    +    private Object readField(final InputStream in, final RecordField 
field) throws IOException {
    +        switch (field.getRepetition()) {
    +            case ZERO_OR_MORE: {
    +                // If repetition is 0+ then that means we have a list and 
need to read how many items are in the list.
    +                final int iterations = readInt(in);
    +                if (iterations == 0) {
    +                    return Collections.emptyList();
    +                }
    +
    +                final List<Object> value = new ArrayList<>(iterations);
    +                for (int i = 0; i < iterations; i++) {
    +                    value.add(readFieldValue(in, field.getFieldType(), 
field.getFieldName(), field.getSubFields()));
    +                }
    +
    +                return value;
    +            }
    +            case ZERO_OR_ONE: {
    +                // If repetition is 0 or 1 (optional), then check if next 
byte is a 0, which means field is absent or 1, which means
    +                // field is present. Otherwise, throw an Exception.
    +                final int nextByte = in.read();
    +                if (nextByte == -1) {
    +                    throw new EOFException("Unexpected End-of-File when 
attempting to read Repetition value for field '" + field.getFieldName() + "'");
    +                }
    +                if (nextByte == 0) {
    +                    return null;
    +                }
    +                if (nextByte != 1) {
    +                    throw new IOException("Invalid Boolean value found 
when reading 'Repetition' of field '" + field.getFieldName() + "'. Expected 0 
or 1 but got " + (nextByte & 0xFF));
    +                }
    +            }
    +        }
    +
    +        return readFieldValue(in, field.getFieldType(), 
field.getFieldName(), field.getSubFields());
    +    }
    +
    +
    +    private Object readFieldValue(final InputStream in, final FieldType 
fieldType, final String fieldName, final List<RecordField> subFields) throws 
IOException {
    +        switch (fieldType) {
    +            case BOOLEAN: {
    +                final DataInputStream dis = new DataInputStream(in);
    +                return dis.readBoolean();
    +            }
    +            case INT: {
    +                return readInt(in);
    +            }
    +            case LONG: {
    +                final DataInputStream dis = new DataInputStream(in);
    +                return dis.readLong();
    +            }
    +            case STRING: {
    +                final DataInputStream dis = new DataInputStream(in);
    +                return dis.readUTF();
    +            }
    +            case LONG_STRING: {
    +                final int length = readInt(in);
    +                final byte[] buffer = new byte[length];
    +                fillBuffer(in, buffer);
    +                return new String(buffer, StandardCharsets.UTF_8);
    +            }
    +            case BYTE_ARRAY: {
    +                final int length = readInt(in);
    +                final byte[] buffer = new byte[length];
    +                fillBuffer(in, buffer);
    +                return buffer;
    +            }
    +            case MAP: {
    +                final int numEntries = readInt(in);
    +                final RecordField keyField = subFields.get(0);
    +                final RecordField valueField = subFields.get(1);
    +
    +                final Map<Object, Object> entries = new 
HashMap<>(numEntries);
    +                for (int i = 0; i < numEntries; i++) {
    +                    final Object key = readField(in, keyField);
    +                    final Object value = readField(in, valueField);
    +                    entries.put(key, value);
    +                }
    +
    +                return entries;
    +            }
    +            case COMPLEX: {
    +                final int numSubFields = subFields.size();
    +                final Map<RecordField, Object> subFieldValues = new 
HashMap<>(numSubFields);
    +                for (int i = 0; i < numSubFields; i++) {
    +                    final Object subFieldValue = readField(in, 
subFields.get(i));
    +                    subFieldValues.put(subFields.get(i), subFieldValue);
    +                }
    +
    +                return new FieldMapRecord(subFieldValues, new 
RecordSchema(subFields));
    +            }
    +            case UNION: {
    +                final DataInputStream dis = new DataInputStream(in);
    +                final String childFieldType = dis.readUTF();
    +                final Optional<RecordField> fieldOption = 
subFields.stream().filter(field -> 
field.getFieldName().equals(childFieldType)).findFirst();
    +                if (!fieldOption.isPresent()) {
    +                    throw new IOException("Found a field of type '" + 
childFieldType + "' but that was not in the expected list of types");
    +                }
    +
    +                final RecordField matchingField = fieldOption.get();
    +                return readField(in, matchingField);
    +            }
    +            default: {
    +                throw new IOException("Unrecognized Field Type " + 
fieldType + " for field '" + fieldName + "'");
    +            }
    +        }
    +    }
    +
    +    private int readInt(final InputStream in) throws IOException {
    +        final byte[] buffer = new byte[4];
    +        fillBuffer(in, buffer);
    +
    +        final int value = ((buffer[0] & 0xFF) << 24) +
    +            ((buffer[1] & 0xFF) << 16) +
    +            ((buffer[2] & 0xFF) << 8) +
    +            (buffer[3] & 0xFF);
    +
    +        return value;
    --- End diff --
    
    We should probably fall back on using ByteBuffer (just to avoid extra code 
to maintain)
    The above could be eliminated with this:
    ```
    return ByteBuffer.wrap(buffer).getInt();
    ```


> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
>                 Key: NIFI-2854
>                 URL: https://issues.apache.org/jira/browse/NIFI-2854
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very 
> important roll in NiFi's ability to be safely upgraded and rolled back.  We 
> need to have well documented behaviors, designs, and version adherence so 
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance 
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which 
> will break forward/backward compatibility and specifically this means that 
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they 
> should be reserved for major releases only and should include a utility to 
> help users with pre-existing data convert from some older format to the newer 
> format.  It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle 
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to 
> existing write ahead log record types but no fields can be removed nor can 
> any new types be added.  Once a field is considered required it must remain 
> required.  Changes may only be made across minor version changes - not 
> incremental.
> * Swap File storage should follow very similar rules to the flow file 
> repository.  Adding a schema to the swap file header may allow some variation 
> there but the variation should only be hints to optimize how they're 
> processed and not change their behavior otherwise. Changes are only permitted 
> during minor version releases.
> * Provenance repository changes are only permitted during minor version 
> releases.  These changes may include adding or removing fields from existing 
> event types.  If a field is considered required it must always be considered 
> required.  If a field is removed then it must not be a required field and 
> there must be a sensible default an older version could use if that value is 
> not found in new data once rolled back.  New event types may be added.  
> Fields or event types not known to older version, if seen after a rollback, 
> will simply be ignored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to