[
https://issues.apache.org/jira/browse/NIFI-2854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671263#comment-15671263
]
ASF GitHub Bot commented on NIFI-2854:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1202#discussion_r88302396
--- Diff:
nifi-commons/nifi-schema-utils/src/main/java/org/apache/nifi/repository/schema/RecordSchema.java
---
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.repository.schema;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class RecordSchema {
+ private static final String FIELD_NAME = "Field Name";
+ private static final String FIELD_TYPE = "Field Type";
+ private static final String REPETITION = "Repetition";
+ private static final String SUBFIELDS = "SubFields";
+
+ private static final String STRING_TYPE = "String";
+ private static final String INT_TYPE = "Integer";
+ private static final String LONG_TYPE = "Long";
+ private static final String SUBFIELD_TYPE = "SubFieldList";
+
+ private final List<RecordField> fields;
+
+ public RecordSchema(final List<RecordField> fields) {
+ this.fields = fields;
+ }
+
+ public RecordSchema(final RecordField... fields) {
+ this(Arrays.asList(fields));
+ }
+
+ public List<RecordField> getFields() {
+ return fields;
+ }
+
+ public RecordField getField(final String fieldName) {
+ return fields.stream()
+ .filter(field -> field.getFieldName().equals(fieldName))
+ .findFirst()
+ .orElse(null);
+ }
+
+ public void writeTo(final OutputStream out) throws IOException {
+ try {
+ final DataOutputStream dos = (out instanceof DataOutputStream)
? (DataOutputStream) out : new DataOutputStream(out);
+
+ dos.writeInt(fields.size());
+ for (final RecordField field : fields) {
+ writeField(field, dos);
+ }
+ } catch (final IOException ioe) {
+ throw new IOException("Unable to write Record Schema to
stream", ioe);
+ }
+ }
+
+ private void writeField(final RecordField field, final
DataOutputStream dos) throws IOException {
+ dos.writeInt(4); // A field is made up of 4 "elements": Field
Name, Field Type, Field Repetition, Sub-Fields.
+
+ // For each of the elements, we write a String indicating the
Element Name, a String indicating the Element Type, and
+ // finally the Element data itself.
+ dos.writeUTF(FIELD_NAME);
+ dos.writeUTF(STRING_TYPE);
+ dos.writeUTF(field.getFieldName());
+
+ dos.writeUTF(FIELD_TYPE);
+ dos.writeUTF(STRING_TYPE);
+ dos.writeUTF(field.getFieldType().name());
+
+ dos.writeUTF(REPETITION);
+ dos.writeUTF(STRING_TYPE);
+ dos.writeUTF(field.getRepetition().name());
+
+ dos.writeUTF(SUBFIELDS);
+ dos.writeUTF(SUBFIELD_TYPE);
+ final List<RecordField> subFields = field.getSubFields();
+ dos.writeInt(subFields.size()); // SubField is encoded as number
of Sub-Fields followed by the fields themselves.
+ for (final RecordField subField : subFields) {
+ writeField(subField, dos);
+ }
+ }
+
+ public static RecordSchema readFrom(final InputStream in) throws
IOException {
+ try {
+ final DataInputStream dis = (in instanceof DataInputStream) ?
(DataInputStream) in : new DataInputStream(in);
+
+ final int numFields = dis.readInt();
+ final List<RecordField> fields = new ArrayList<>(numFields);
+
+ for (int i = 0; i < numFields; i++) {
+ final RecordField field = readField(dis);
+ fields.add(field);
+ }
+
+ return new RecordSchema(fields);
+ } catch (final IOException ioe) {
+ throw new IOException("Unable to read Record Schema from
stream", ioe);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static RecordField readField(final DataInputStream dis) throws
IOException {
+ final Map<String, Object> schemaFieldMap = new HashMap<>();
+ final int numElementsToRead = dis.readInt();
+ for (int i = 0; i < numElementsToRead; i++) {
+ final String fieldName = dis.readUTF();
+ final String typeName = dis.readUTF();
+ Object fieldValue = null;
+
+ switch (typeName) {
+ case STRING_TYPE:
+ fieldValue = dis.readUTF();
+ break;
+ case INT_TYPE:
+ fieldValue = dis.readInt();
+ break;
+ case LONG_TYPE:
+ fieldValue = dis.readLong();
+ break;
+ case SUBFIELD_TYPE: {
+ final int numFields = dis.readInt();
+ final List<RecordField> subFields = new
ArrayList<>(numFields);
+ for (int j = 0; j < numFields; j++) {
+ subFields.add(readField(dis));
+ }
+ fieldValue = subFields;
+ break;
+ }
+ default: {
+ throw new IOException("Cannot read schema because the
schema definition contains a field named '"
+ + fieldName + "' with a Field Type of '" +
typeName + "', which is not a known Field Type");
+ }
+ }
+
+ schemaFieldMap.put(fieldName, fieldValue);
+ }
+
+ final String fieldName = (String) schemaFieldMap.get(FIELD_NAME);
+ final String fieldTypeName = (String)
schemaFieldMap.get(FIELD_TYPE);
+ final String repetitionName = (String)
schemaFieldMap.get(REPETITION);
+ List<RecordField> subFields = (List<RecordField>)
schemaFieldMap.get(SUBFIELDS);
+ if (subFields == null) {
+ subFields = Collections.emptyList();
+ }
+
+ final Repetition repetition = Repetition.valueOf(repetitionName);
+ if (FieldType.COMPLEX.name().equals(fieldTypeName)) {
+ return new ComplexRecordField(fieldName, repetition,
subFields);
+ } else if (FieldType.UNION.name().equals(fieldTypeName)) {
+ return new UnionRecordField(fieldName, repetition, subFields);
+ } else if (FieldType.MAP.name().equals(fieldTypeName)) {
+ if (subFields.size() != 2) {
+ throw new IOException("Found a Map that did not have a
'Key' field and a 'Value' field but instead had " + subFields.size() + "
fields: " + subFields);
--- End diff --
IMHO message is a bit confusing. I'd say "Field of type Map must only
contain 2 elements representing key/value. Was. . . ."
> Enable repositories to support upgrades and rollback in well defined scenarios
> ------------------------------------------------------------------------------
>
> Key: NIFI-2854
> URL: https://issues.apache.org/jira/browse/NIFI-2854
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.1.0
>
>
> The flowfile, swapfile, provenance, and content repositories play a very
> important roll in NiFi's ability to be safely upgraded and rolled back. We
> need to have well documented behaviors, designs, and version adherence so
> that users can safely rely on these mechanisms.
> Once this is formalized and in place we should update our versioning guidance
> to reflect this as well.
> The following would be true from NiFi 1.2.0 onward
> * No changes to how the repositories are persisted to disk can be made which
> will break forward/backward compatibility and specifically this means that
> things like the way each is serialized to disk cannot change.
> * If changes are made which impact forward or backward compatibility they
> should be reserved for major releases only and should include a utility to
> help users with pre-existing data convert from some older format to the newer
> format. It may not be feasible to have rollback on major releases.
> * The content repository should not be changed within a major release cycle
> in any way that will harm forward or backward compatibility.
> * The flow file repository can change in that new fields can be added to
> existing write ahead log record types but no fields can be removed nor can
> any new types be added. Once a field is considered required it must remain
> required. Changes may only be made across minor version changes - not
> incremental.
> * Swap File storage should follow very similar rules to the flow file
> repository. Adding a schema to the swap file header may allow some variation
> there but the variation should only be hints to optimize how they're
> processed and not change their behavior otherwise. Changes are only permitted
> during minor version releases.
> * Provenance repository changes are only permitted during minor version
> releases. These changes may include adding or removing fields from existing
> event types. If a field is considered required it must always be considered
> required. If a field is removed then it must not be a required field and
> there must be a sensible default an older version could use if that value is
> not found in new data once rolled back. New event types may be added.
> Fields or event types not known to older version, if seen after a rollback,
> will simply be ignored.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)