NIFI-3273: Added nifi-toolkit-flowfile-repo that contains a simple Java class that is capable of recovering a FlowFile Repository manually in the case of an operating system crash that results in trailing 0's being dumped into the edit logs. Also refactored flowfile repo into some independent modules so that it additional capabilities can be added in the future to examine the flowfile repo
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0207f21c Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0207f21c Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0207f21c Branch: refs/heads/master Commit: 0207f21ce4bb68536843d65ecc9110ec834028cd Parents: 141334c Author: Mark Payne <[email protected]> Authored: Wed Mar 22 13:08:30 2017 -0400 Committer: joewitt <[email protected]> Committed: Wed Apr 19 21:44:23 2017 -0700 ---------------------------------------------------------------------- .../nifi-flowfile-repo-serialization/pom.xml | 44 ++ .../repository/RepositoryRecordSerde.java | 68 +++ .../RepositoryRecordSerdeFactory.java | 95 ++++ .../repository/SchemaRepositoryRecordSerde.java | 239 +++++++++ .../WriteAheadRepositoryRecordSerde.java | 517 +++++++++++++++++++ .../repository/schema/ContentClaimFieldMap.java | 125 +++++ .../repository/schema/ContentClaimSchema.java | 63 +++ .../schema/FlowFileRecordFieldMap.java | 99 ++++ .../repository/schema/FlowFileSchema.java | 87 ++++ .../schema/RepositoryRecordFieldMap.java | 83 +++ .../schema/RepositoryRecordSchema.java | 138 +++++ .../schema/RepositoryRecordUpdate.java | 73 +++ .../schema/ResourceClaimFieldMap.java | 85 +++ .../SchemaRepositoryRecordSerdeTest.java | 266 ++++++++++ .../nifi-framework/nifi-framework-core/pom.xml | 8 + .../repository/RepositoryRecordSerde.java | 68 --- .../RepositoryRecordSerdeFactory.java | 95 ---- .../repository/SchemaRepositoryRecordSerde.java | 239 --------- .../repository/StandardFlowFileRecord.java | 341 ------------ .../repository/StandardRepositoryRecord.java | 221 -------- .../WriteAheadRepositoryRecordSerde.java | 517 ------------------- .../repository/claim/StandardContentClaim.java | 105 ---- .../repository/claim/StandardResourceClaim.java | 122 ----- .../claim/StandardResourceClaimManager.java | 219 -------- .../repository/schema/ContentClaimFieldMap.java | 125 ----- .../repository/schema/ContentClaimSchema.java | 63 --- .../schema/FlowFileRecordFieldMap.java | 99 ---- .../repository/schema/FlowFileSchema.java | 87 ---- .../schema/RepositoryRecordFieldMap.java | 83 --- .../schema/RepositoryRecordSchema.java | 138 ----- .../schema/RepositoryRecordUpdate.java | 73 --- .../schema/ResourceClaimFieldMap.java | 85 --- .../nifi-repository-models/pom.xml | 44 ++ .../repository/StandardFlowFileRecord.java | 341 ++++++++++++ .../repository/StandardRepositoryRecord.java | 221 ++++++++ .../repository/claim/StandardContentClaim.java | 105 ++++ .../repository/claim/StandardResourceClaim.java | 122 +++++ .../claim/StandardResourceClaimManager.java | 219 ++++++++ .../nifi-framework/pom.xml | 2 + nifi-toolkit/nifi-toolkit-flowfile-repo/pom.xml | 27 + .../flowfile/RepairCorruptedFileEndings.java | 287 ++++++++++ .../TestRepairCorruptedFileEndings.java | 169 ++++++ nifi-toolkit/pom.xml | 1 + pom.xml | 231 +++++---- 44 files changed, 3663 insertions(+), 2776 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml new file mode 100644 index 0000000..02988c0 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/pom.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + You under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework</artifactId> + <version>1.2.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-flowfile-repo-serialization</artifactId> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-framework-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-repository-models</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-write-ahead-log</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-utils</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java new file mode 100644 index 0000000..44ed62d --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.wali.SerDe; +import org.wali.UpdateType; + +public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> { + private Map<String, FlowFileQueue> flowFileQueueMap = null; + + protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) { + this.flowFileQueueMap = queueMap; + } + + protected Map<String, FlowFileQueue> getQueueMap() { + return flowFileQueueMap; + } + + protected FlowFileQueue getFlowFileQueue(final String queueId) { + return flowFileQueueMap.get(queueId); + } + + @Override + public Long getRecordIdentifier(final RepositoryRecord record) { + return record.getCurrent().getId(); + } + + @Override + public UpdateType getUpdateType(final RepositoryRecord record) { + switch (record.getType()) { + case CONTENTMISSING: + case DELETE: + return UpdateType.DELETE; + case CREATE: + return UpdateType.CREATE; + case UPDATE: + return UpdateType.UPDATE; + case SWAP_OUT: + return UpdateType.SWAP_OUT; + case SWAP_IN: + return UpdateType.SWAP_IN; + } + return null; + } + + @Override + public String getLocation(final RepositoryRecord record) { + return record.getSwapLocation(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java new file mode 100644 index 0000000..c19fa94 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.wali.SerDe; +import org.wali.SerDeFactory; +import org.wali.UpdateType; + +public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> { + private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde"; + private final ResourceClaimManager resourceClaimManager; + private Map<String, FlowFileQueue> flowFileQueueMap = null; + + public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) { + this.resourceClaimManager = claimManager; + } + + protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) { + this.flowFileQueueMap = queueMap; + } + + protected Map<String, FlowFileQueue> getQueueMap() { + return flowFileQueueMap; + } + + @Override + public SerDe<RepositoryRecord> createSerDe(final String encodingName) { + if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) { + final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager); + serde.setQueueMap(flowFileQueueMap); + return serde; + } + + if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName) + || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) { + final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager); + serde.setQueueMap(flowFileQueueMap); + return serde; + } + + throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known"); + } + + protected FlowFileQueue getFlowFileQueue(final String queueId) { + return flowFileQueueMap.get(queueId); + } + + @Override + public Long getRecordIdentifier(final RepositoryRecord record) { + return record.getCurrent().getId(); + } + + @Override + public UpdateType getUpdateType(final RepositoryRecord record) { + switch (record.getType()) { + case CONTENTMISSING: + case DELETE: + return UpdateType.DELETE; + case CREATE: + return UpdateType.CREATE; + case UPDATE: + return UpdateType.UPDATE; + case SWAP_OUT: + return UpdateType.SWAP_OUT; + case SWAP_IN: + return UpdateType.SWAP_IN; + } + return null; + } + + @Override + public String getLocation(final RepositoryRecord record) { + return record.getSwapLocation(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java new file mode 100644 index 0000000..221f8ce --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap; +import org.apache.nifi.controller.repository.schema.ContentClaimSchema; +import org.apache.nifi.controller.repository.schema.FlowFileSchema; +import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap; +import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate; +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SchemaRecordReader; +import org.apache.nifi.repository.schema.SchemaRecordWriter; +import org.apache.nifi.repository.schema.SimpleRecordField; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { + private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class); + private static final int MAX_ENCODING_VERSION = 2; + + private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2; + private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1; + + private final ResourceClaimManager resourceClaimManager; + private volatile RecordSchema recoverySchema; + + public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) { + this.resourceClaimManager = resourceClaimManager; + } + + @Override + public void writeHeader(final DataOutputStream out) throws IOException { + writeSchema.writeTo(out); + } + + @Override + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException { + serializeRecord(newRecordState, out); + } + + @Override + public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { + final RecordSchema schema; + switch (record.getType()) { + case CREATE: + case UPDATE: + schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2; + break; + case CONTENTMISSING: + case DELETE: + schema = RepositoryRecordSchema.DELETE_SCHEMA_V2; + break; + case SWAP_IN: + schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2; + break; + case SWAP_OUT: + schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2; + break; + default: + throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen. + } + + serializeRecord(record, out, schema, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2); + } + + + protected void serializeRecord(final RepositoryRecord record, final DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) throws IOException { + final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema); + final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, repositoryRecordSchema); + new SchemaRecordWriter().writeRecord(update, out); + } + + @Override + public void readHeader(final DataInputStream in) throws IOException { + recoverySchema = RecordSchema.readFrom(in); + } + + @Override + public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { + return deserializeRecord(in, version); + } + + @Override + public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema); + final Record updateRecord = reader.readRecord(in); + if (updateRecord == null) { + // null may be returned by reader.readRecord() if it encounters end-of-stream + return null; + } + + // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the + // top level that indicates which type of record we have. + final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2); + + final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD); + final UpdateType updateType = UpdateType.valueOf(actionType); + switch (updateType) { + case CREATE: + return createRecord(record); + case DELETE: + return deleteRecord(record); + case SWAP_IN: + return swapInRecord(record); + case SWAP_OUT: + return swapOutRecord(record); + case UPDATE: + return updateRecord(record); + default: + throw new IOException("Found unrecognized Update Type '" + actionType + "'"); + } + } + + + @SuppressWarnings("unchecked") + private StandardRepositoryRecord createRecord(final Record record) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID)); + ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE)); + + final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE); + final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX); + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + + final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE); + final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX); + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + populateContentClaim(ffBuilder, record); + ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE)); + + ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES)); + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, flowFileRecord); + requireFlowFileQueue(repoRecord, queueId); + return repoRecord; + } + + private void requireFlowFileQueue(final StandardRepositoryRecord repoRecord, final String queueId) { + if (queueId == null || queueId.trim().isEmpty()) { + logger.warn("{} does not have a Queue associated with it; this record will be discarded", repoRecord.getCurrent()); + repoRecord.markForAbort(); + } else if (repoRecord.getOriginalQueue() == null) { + logger.warn("{} maps to unknown Queue {}; this record will be discarded", repoRecord.getCurrent(), queueId); + repoRecord.markForAbort(); + } + } + + private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) { + final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM); + if (claimMap == null) { + return; + } + + final Record claimRecord = (Record) claimMap; + final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager); + final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord); + + ffBuilder.contentClaim(contentClaim); + ffBuilder.contentClaimOffset(offset); + } + + private RepositoryRecord updateRecord(final Record record) { + return createRecord(record); + } + + private RepositoryRecord deleteRecord(final Record record) { + final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD); + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + final FlowFileRecord flowFileRecord = ffBuilder.build(); + + final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + repoRecord.markForDelete(); + return repoRecord; + } + + private RepositoryRecord swapInRecord(final Record record) { + final StandardRepositoryRecord repoRecord = createRecord(record); + final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + repoRecord.setSwapLocation(swapLocation); + + final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); + requireFlowFileQueue(repoRecord, queueId); + return repoRecord; + } + + private RepositoryRecord swapOutRecord(final Record record) { + final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD); + final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(recordId) + .build(); + + return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation); + } + + @Override + public int getVersion() { + return MAX_ENCODING_VERSION; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java new file mode 100644 index 0000000..e8ce44e --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.flowfile.FlowFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.wali.SerDe; +import org.wali.UpdateType; + +public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { + private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class); + + private static final int CURRENT_ENCODING_VERSION = 9; + + public static final byte ACTION_CREATE = 0; + public static final byte ACTION_UPDATE = 1; + public static final byte ACTION_DELETE = 2; + public static final byte ACTION_SWAPPED_OUT = 3; + public static final byte ACTION_SWAPPED_IN = 4; + + private long recordsRestored = 0L; + private final ResourceClaimManager claimManager; + + public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) { + this.claimManager = claimManager; + } + + @Override + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException { + serializeEdit(previousRecordState, record, out, false); + } + + public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException { + if (record.isMarkedForAbort()) { + logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record); + out.write(ACTION_DELETE); + out.writeLong(getRecordIdentifier(record)); + serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); + return; + } + + final UpdateType updateType = getUpdateType(record); + + if (updateType.equals(UpdateType.DELETE)) { + out.write(ACTION_DELETE); + out.writeLong(getRecordIdentifier(record)); + serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); + return; + } + + // If there's a Destination Connection, that's the one that we want to associated with this record. + // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection". + // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen, + // so we use the originalConnection instead + FlowFileQueue associatedQueue = record.getDestination(); + if (associatedQueue == null) { + associatedQueue = record.getOriginalQueue(); + } + + if (updateType.equals(UpdateType.SWAP_OUT)) { + out.write(ACTION_SWAPPED_OUT); + out.writeLong(getRecordIdentifier(record)); + out.writeUTF(associatedQueue.getIdentifier()); + out.writeUTF(getLocation(record)); + return; + } + + final FlowFile flowFile = record.getCurrent(); + final ContentClaim claim = record.getCurrentClaim(); + + switch (updateType) { + case UPDATE: + out.write(ACTION_UPDATE); + break; + case CREATE: + out.write(ACTION_CREATE); + break; + case SWAP_IN: + out.write(ACTION_SWAPPED_IN); + break; + default: + throw new AssertionError(); + } + + out.writeLong(getRecordIdentifier(record)); + out.writeLong(flowFile.getEntryDate()); + out.writeLong(flowFile.getLineageStartDate()); + out.writeLong(flowFile.getLineageStartIndex()); + + final Long queueDate = flowFile.getLastQueueDate(); + out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); + out.writeLong(flowFile.getQueueDateIndex()); + out.writeLong(flowFile.getSize()); + + if (associatedQueue == null) { + logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart", + new Object[] {this, record}); + writeString("", out); + } else { + writeString(associatedQueue.getIdentifier(), out); + } + + serializeContentClaim(claim, record.getCurrentClaimOffset(), out); + + if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) { + out.write(1); // indicate attributes changed + final Map<String, String> attributes = flowFile.getAttributes(); + out.writeInt(attributes.size()); + for (final Map.Entry<String, String> entry : attributes.entrySet()) { + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + } else { + out.write(0); // indicate attributes did not change + } + + if (updateType == UpdateType.SWAP_IN) { + out.writeUTF(record.getSwapLocation()); + } + } + + @Override + public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { + final int action = in.read(); + final long recordId = in.readLong(); + if (action == ACTION_DELETE) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + + if (version > 4) { + deserializeClaim(in, version, ffBuilder); + } + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + record.markForDelete(); + + return record; + } + + if (action == ACTION_SWAPPED_OUT) { + final String queueId = in.readUTF(); + final String location = in.readUTF(); + final FlowFileQueue queue = getFlowFileQueue(queueId); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(recordId) + .build(); + + return new StandardRepositoryRecord(queue, flowFileRecord, location); + } + + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + final RepositoryRecord record = currentRecordStates.get(recordId); + ffBuilder.id(recordId); + if (record != null) { + ffBuilder.fromFlowFile(record.getCurrent()); + } + ffBuilder.entryDate(in.readLong()); + + if (version > 1) { + // read the lineage identifiers and lineage start date, which were added in version 2. + if (version < 9) { + final int numLineageIds = in.readInt(); + for (int i = 0; i < numLineageIds; i++) { + in.readUTF(); //skip identifiers + } + } + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + if (version > 5) { + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + } + } + + ffBuilder.size(in.readLong()); + final String connectionId = readString(in); + + logger.debug("{} -> {}", new Object[] {recordId, connectionId}); + + deserializeClaim(in, version, ffBuilder); + + // recover new attributes, if they changed + final int attributesChanged = in.read(); + if (attributesChanged == -1) { + throw new EOFException(); + } else if (attributesChanged == 1) { + final int numAttributes = in.readInt(); + final Map<String, String> attributes = new HashMap<>(); + for (int i = 0; i < numAttributes; i++) { + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + ffBuilder.addAttributes(attributes); + } else if (attributesChanged != 0) { + throw new IOException("Attribute Change Qualifier not found in stream; found value: " + + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!"); + } + + final FlowFileRecord flowFile = ffBuilder.build(); + String swapLocation = null; + if (action == ACTION_SWAPPED_IN) { + swapLocation = in.readUTF(); + } + + final FlowFileQueue queue = getFlowFileQueue(connectionId); + final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile); + if (swapLocation != null) { + standardRepoRecord.setSwapLocation(swapLocation); + } + + if (connectionId.isEmpty()) { + logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile); + standardRepoRecord.markForAbort(); + } else if (queue == null) { + logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId); + standardRepoRecord.markForAbort(); + } + + recordsRestored++; + return standardRepoRecord; + } + + @Override + public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { + final int action = in.read(); + if (action == -1) { + return null; + } + + final long recordId = in.readLong(); + if (action == ACTION_DELETE) { + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); + + if (version > 4) { + deserializeClaim(in, version, ffBuilder); + } + + final FlowFileRecord flowFileRecord = ffBuilder.build(); + final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); + record.markForDelete(); + return record; + } + + // if action was not delete, it must be create/swap in + final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); + final long entryDate = in.readLong(); + + if (version > 1) { + // read the lineage identifiers and lineage start date, which were added in version 2. + if (version < 9) { + final int numLineageIds = in.readInt(); + for (int i = 0; i < numLineageIds; i++) { + in.readUTF(); //skip identifiers + } + } + + final long lineageStartDate = in.readLong(); + final long lineageStartIndex; + if (version > 7) { + lineageStartIndex = in.readLong(); + } else { + lineageStartIndex = 0L; + } + ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); + + if (version > 5) { + final long lastQueueDate = in.readLong(); + final long queueDateIndex; + if (version > 7) { + queueDateIndex = in.readLong(); + } else { + queueDateIndex = 0L; + } + + ffBuilder.lastQueued(lastQueueDate, queueDateIndex); + } + } + + final long size = in.readLong(); + final String connectionId = readString(in); + + logger.debug("{} -> {}", new Object[] {recordId, connectionId}); + + ffBuilder.id(recordId); + ffBuilder.entryDate(entryDate); + ffBuilder.size(size); + + deserializeClaim(in, version, ffBuilder); + + final int attributesChanged = in.read(); + if (attributesChanged == 1) { + final int numAttributes = in.readInt(); + final Map<String, String> attributes = new HashMap<>(); + for (int i = 0; i < numAttributes; i++) { + final String key = readString(in); + final String value = readString(in); + attributes.put(key, value); + } + + ffBuilder.addAttributes(attributes); + } else if (attributesChanged == -1) { + throw new EOFException(); + } else if (attributesChanged != 0) { + throw new IOException("Attribute Change Qualifier not found in stream; found value: " + + attributesChanged + " after successfully restoring " + recordsRestored + " records"); + } + + final FlowFileRecord flowFile = ffBuilder.build(); + String swapLocation = null; + if (action == ACTION_SWAPPED_IN) { + swapLocation = in.readUTF(); + } + + final StandardRepositoryRecord record; + final FlowFileQueue queue = getFlowFileQueue(connectionId); + record = new StandardRepositoryRecord(queue, flowFile); + if (swapLocation != null) { + record.setSwapLocation(swapLocation); + } + + if (connectionId.isEmpty()) { + logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile); + record.markForAbort(); + } else if (queue == null) { + logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId); + record.markForAbort(); + } + + recordsRestored++; + return record; + } + + @Override + public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { + serializeEdit(null, record, out, true); + } + + private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException { + if (claim == null) { + out.write(0); + } else { + out.write(1); + + final ResourceClaim resourceClaim = claim.getResourceClaim(); + writeString(resourceClaim.getId(), out); + writeString(resourceClaim.getContainer(), out); + writeString(resourceClaim.getSection(), out); + out.writeLong(claim.getOffset()); + out.writeLong(claim.getLength()); + + out.writeLong(offset); + out.writeBoolean(resourceClaim.isLossTolerant()); + } + } + + private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException { + // determine current Content Claim. + final int claimExists = in.read(); + if (claimExists == 1) { + final String claimId; + if (serializationVersion < 4) { + claimId = String.valueOf(in.readLong()); + } else { + claimId = readString(in); + } + + final String container = readString(in); + final String section = readString(in); + + final long resourceOffset; + final long resourceLength; + if (serializationVersion < 7) { + resourceOffset = 0L; + resourceLength = -1L; + } else { + resourceOffset = in.readLong(); + resourceLength = in.readLong(); + } + + final long claimOffset = in.readLong(); + + final boolean lossTolerant; + if (serializationVersion >= 3) { + lossTolerant = in.readBoolean(); + } else { + lossTolerant = false; + } + + final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); + final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); + contentClaim.setLength(resourceLength); + + ffBuilder.contentClaim(contentClaim); + ffBuilder.contentClaimOffset(claimOffset); + } else if (claimExists == -1) { + throw new EOFException(); + } else if (claimExists != 0) { + throw new IOException("Claim Existence Qualifier not found in stream; found value: " + + claimExists + " after successfully restoring " + recordsRestored + " records"); + } + } + + private void writeString(final String toWrite, final OutputStream out) throws IOException { + final byte[] bytes = toWrite.getBytes("UTF-8"); + final int utflen = bytes.length; + + if (utflen < 65535) { + out.write(utflen >>> 8); + out.write(utflen); + out.write(bytes); + } else { + out.write(255); + out.write(255); + out.write(utflen >>> 24); + out.write(utflen >>> 16); + out.write(utflen >>> 8); + out.write(utflen); + out.write(bytes); + } + } + + private String readString(final InputStream in) throws IOException { + final Integer numBytes = readFieldLength(in); + if (numBytes == null) { + throw new EOFException(); + } + final byte[] bytes = new byte[numBytes]; + fillBuffer(in, bytes, numBytes); + return new String(bytes, "UTF-8"); + } + + private Integer readFieldLength(final InputStream in) throws IOException { + final int firstValue = in.read(); + final int secondValue = in.read(); + if (firstValue < 0) { + return null; + } + if (secondValue < 0) { + throw new EOFException(); + } + if (firstValue == 0xff && secondValue == 0xff) { + final int ch1 = in.read(); + final int ch2 = in.read(); + final int ch3 = in.read(); + final int ch4 = in.read(); + if ((ch1 | ch2 | ch3 | ch4) < 0) { + throw new EOFException(); + } + return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; + } else { + return (firstValue << 8) + secondValue; + } + } + + private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { + int bytesRead; + int totalBytesRead = 0; + while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { + totalBytesRead += bytesRead; + } + if (totalBytesRead != length) { + throw new EOFException(); + } + } + + @Override + public int getVersion() { + return CURRENT_ENCODING_VERSION; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java new file mode 100644 index 0000000..b218ee6 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import java.util.List; + +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.controller.repository.claim.StandardContentClaim; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; + +public class ContentClaimFieldMap implements Record { + private final ContentClaim contentClaim; + private final long contentClaimOffset; + private final ResourceClaimFieldMap resourceClaimFieldMap; + private final RecordSchema schema; + + public ContentClaimFieldMap(final ContentClaim contentClaim, final long contentClaimOffset, final RecordSchema schema) { + this.contentClaim = contentClaim; + this.contentClaimOffset = contentClaimOffset; + this.schema = schema; + + final List<RecordField> resourceClaimFields = schema.getField(ContentClaimSchema.RESOURCE_CLAIM).getSubFields(); + final RecordSchema resourceClaimSchema = new RecordSchema(resourceClaimFields); + this.resourceClaimFieldMap = new ResourceClaimFieldMap(contentClaim.getResourceClaim(), resourceClaimSchema); + } + + @Override + public Object getFieldValue(final String fieldName) { + switch (fieldName) { + case ContentClaimSchema.RESOURCE_CLAIM: + return resourceClaimFieldMap; + case ContentClaimSchema.CONTENT_CLAIM_LENGTH: + return contentClaim.getLength(); + case ContentClaimSchema.CONTENT_CLAIM_OFFSET: + return contentClaimOffset; + case ContentClaimSchema.RESOURCE_CLAIM_OFFSET: + return contentClaim.getOffset(); + default: + return null; + } + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public int hashCode() { + return (int) (31 + contentClaimOffset + 21 * resourceClaimFieldMap.hashCode()); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + + ContentClaimFieldMap other = (ContentClaimFieldMap) obj; + if (contentClaimOffset != other.contentClaimOffset) { + return false; + } + + if (resourceClaimFieldMap == null) { + if (other.resourceClaimFieldMap != null) { + return false; + } + } else if (!resourceClaimFieldMap.equals(other.resourceClaimFieldMap)) { + return false; + } + + return true; + } + + @Override + public String toString() { + return "ContentClaimFieldMap[" + contentClaim + "]"; + } + + public static ContentClaim getContentClaim(final Record claimRecord, final ResourceClaimManager resourceClaimManager) { + final Record resourceClaimRecord = (Record) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM); + final String container = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER); + final String section = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_SECTION); + final String identifier = (String) resourceClaimRecord.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER); + final Boolean lossTolerant = (Boolean) resourceClaimRecord.getFieldValue(ContentClaimSchema.LOSS_TOLERANT); + + final Long length = (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_LENGTH); + final Long resourceOffset = (Long) claimRecord.getFieldValue(ContentClaimSchema.RESOURCE_CLAIM_OFFSET); + + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, lossTolerant, false); + final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); + contentClaim.setLength(length); + + return contentClaim; + } + + public static Long getContentClaimOffset(final Record claimRecord) { + return (Long) claimRecord.getFieldValue(ContentClaimSchema.CONTENT_CLAIM_OFFSET); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java new file mode 100644 index 0000000..c55c758 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimSchema.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.repository.schema.ComplexRecordField; +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SimpleRecordField; + +public class ContentClaimSchema { + + // resource claim fields + public static final String CLAIM_CONTAINER = "Container"; + public static final String CLAIM_SECTION = "Section"; + public static final String CLAIM_IDENTIFIER = "Identifier"; + public static final String LOSS_TOLERANT = "Loss Tolerant"; + public static final String RESOURCE_CLAIM = "Resource Claim"; + + // content claim fields + public static final String RESOURCE_CLAIM_OFFSET = "Resource Claim Offset"; // offset into resource claim where the content claim begins + public static final String CONTENT_CLAIM_OFFSET = "Content Claim Offset"; // offset into the content claim where the flowfile begins + public static final String CONTENT_CLAIM_LENGTH = "Content Claim Length"; + + public static final RecordSchema CONTENT_CLAIM_SCHEMA_V1; + public static final RecordSchema RESOURCE_CLAIM_SCHEMA_V1; + + static { + final List<RecordField> resourceClaimFields = new ArrayList<>(); + resourceClaimFields.add(new SimpleRecordField(CLAIM_CONTAINER, FieldType.STRING, Repetition.EXACTLY_ONE)); + resourceClaimFields.add(new SimpleRecordField(CLAIM_SECTION, FieldType.STRING, Repetition.EXACTLY_ONE)); + resourceClaimFields.add(new SimpleRecordField(CLAIM_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + resourceClaimFields.add(new SimpleRecordField(LOSS_TOLERANT, FieldType.BOOLEAN, Repetition.EXACTLY_ONE)); + RESOURCE_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(resourceClaimFields)); + + final List<RecordField> contentClaimFields = new ArrayList<>(); + contentClaimFields.add(new ComplexRecordField(RESOURCE_CLAIM, Repetition.EXACTLY_ONE, resourceClaimFields)); + contentClaimFields.add(new SimpleRecordField(RESOURCE_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE)); + contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_OFFSET, FieldType.LONG, Repetition.EXACTLY_ONE)); + contentClaimFields.add(new SimpleRecordField(CONTENT_CLAIM_LENGTH, FieldType.LONG, Repetition.EXACTLY_ONE)); + CONTENT_CLAIM_SCHEMA_V1 = new RecordSchema(Collections.unmodifiableList(contentClaimFields)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java new file mode 100644 index 0000000..ff0615f --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileRecordFieldMap.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import java.util.Map; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.StandardFlowFileRecord; +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; + +public class FlowFileRecordFieldMap implements Record { + private final FlowFileRecord flowFile; + private final RecordSchema schema; + private final RecordSchema contentClaimSchema; + private final ContentClaimFieldMap contentClaim; + + public FlowFileRecordFieldMap(final FlowFileRecord flowFile, final RecordSchema schema) { + this.flowFile = flowFile; + this.schema = schema; + + final RecordField contentClaimField = schema.getField(FlowFileSchema.CONTENT_CLAIM); + contentClaimSchema = new RecordSchema(contentClaimField.getSubFields()); + contentClaim = flowFile.getContentClaim() == null ? null : new ContentClaimFieldMap(flowFile.getContentClaim(), flowFile.getContentClaimOffset(), contentClaimSchema); + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Object getFieldValue(final String fieldName) { + switch (fieldName) { + case FlowFileSchema.ATTRIBUTES: + return flowFile.getAttributes(); + case FlowFileSchema.CONTENT_CLAIM: + return contentClaim; + case FlowFileSchema.ENTRY_DATE: + return flowFile.getEntryDate(); + case FlowFileSchema.FLOWFILE_SIZE: + return flowFile.getSize(); + case FlowFileSchema.LINEAGE_START_DATE: + return flowFile.getLineageStartDate(); + case FlowFileSchema.LINEAGE_START_INDEX: + return flowFile.getLineageStartIndex(); + case FlowFileSchema.QUEUE_DATE: + return flowFile.getLastQueueDate(); + case FlowFileSchema.QUEUE_DATE_INDEX: + return flowFile.getQueueDateIndex(); + case FlowFileSchema.RECORD_ID: + return flowFile.getId(); + } + + return null; + } + + @SuppressWarnings("unchecked") + public static FlowFileRecord getFlowFile(final Record record, final ResourceClaimManager claimManager) { + final StandardFlowFileRecord.Builder builder = new StandardFlowFileRecord.Builder(); + builder.id((Long) record.getFieldValue(FlowFileSchema.RECORD_ID)); + builder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE)); + builder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE)); + builder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES)); + builder.lineageStart((Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE), (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX)); + builder.lastQueued((Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE), (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX)); + + final Record contentClaimRecord = (Record) record.getFieldValue(FlowFileSchema.CONTENT_CLAIM); + if (contentClaimRecord != null) { + final ContentClaim claim = ContentClaimFieldMap.getContentClaim(contentClaimRecord, claimManager); + builder.contentClaim(claim); + + final Long offset = ContentClaimFieldMap.getContentClaimOffset(contentClaimRecord); + if (offset != null) { + builder.contentClaimOffset(offset); + } + } + + return builder.build(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java new file mode 100644 index 0000000..6af3066 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/FlowFileSchema.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.repository.schema.ComplexRecordField; +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.MapRecordField; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SimpleRecordField; + +public class FlowFileSchema { + + public static final String RECORD_ID = "Record ID"; + public static final String ENTRY_DATE = "Entry Date"; + public static final String LINEAGE_START_DATE = "Lineage Start Date"; + public static final String LINEAGE_START_INDEX = "Lineage Start Index"; + public static final String QUEUE_DATE = "Queued Date"; + public static final String QUEUE_DATE_INDEX = "Queued Date Index"; + public static final String FLOWFILE_SIZE = "FlowFile Size"; + public static final String CONTENT_CLAIM = "Content Claim"; + public static final String ATTRIBUTES = "Attributes"; + + // attribute fields + public static final String ATTRIBUTE_NAME = "Attribute Name"; + public static final String ATTRIBUTE_VALUE = "Attribute Value"; + + public static final RecordSchema FLOWFILE_SCHEMA_V1; + public static final RecordSchema FLOWFILE_SCHEMA_V2; + + static { + final List<RecordField> flowFileFields = new ArrayList<>(); + + final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.STRING, Repetition.EXACTLY_ONE); + final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.STRING, Repetition.EXACTLY_ONE); + + flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields())); + flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE)); + + FLOWFILE_SCHEMA_V1 = new RecordSchema(flowFileFields); + } + + static { + final List<RecordField> flowFileFields = new ArrayList<>(); + + final RecordField attributeNameField = new SimpleRecordField(ATTRIBUTE_NAME, FieldType.LONG_STRING, Repetition.EXACTLY_ONE); + final RecordField attributeValueField = new SimpleRecordField(ATTRIBUTE_VALUE, FieldType.LONG_STRING, Repetition.EXACTLY_ONE); + + flowFileFields.add(new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(ENTRY_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(LINEAGE_START_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(LINEAGE_START_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(QUEUE_DATE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(QUEUE_DATE_INDEX, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new SimpleRecordField(FLOWFILE_SIZE, FieldType.LONG, Repetition.EXACTLY_ONE)); + flowFileFields.add(new ComplexRecordField(CONTENT_CLAIM, Repetition.ZERO_OR_ONE, ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1.getFields())); + flowFileFields.add(new MapRecordField(ATTRIBUTES, attributeNameField, attributeValueField, Repetition.ZERO_OR_ONE)); + + FLOWFILE_SCHEMA_V2 = new RecordSchema(flowFileFields); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java new file mode 100644 index 0000000..5fe4889 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.controller.repository.RepositoryRecord; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; + +public class RepositoryRecordFieldMap implements Record { + private final RepositoryRecord record; + private final FlowFileRecord flowFile; + private final RecordSchema schema; + private final RecordSchema contentClaimSchema; + + public RepositoryRecordFieldMap(final RepositoryRecord record, final RecordSchema repoRecordSchema, final RecordSchema contentClaimSchema) { + this.schema = repoRecordSchema; + this.contentClaimSchema = contentClaimSchema; + this.record = record; + this.flowFile = record.getCurrent(); + } + + @Override + public Object getFieldValue(final String fieldName) { + switch (fieldName) { + case RepositoryRecordSchema.ACTION_TYPE: + return record.getType().name(); + case RepositoryRecordSchema.RECORD_ID: + return record.getCurrent().getId(); + case RepositoryRecordSchema.SWAP_LOCATION: + return record.getSwapLocation(); + case FlowFileSchema.ATTRIBUTES: + return flowFile.getAttributes(); + case FlowFileSchema.ENTRY_DATE: + return flowFile.getEntryDate(); + case FlowFileSchema.FLOWFILE_SIZE: + return flowFile.getSize(); + case FlowFileSchema.LINEAGE_START_DATE: + return flowFile.getLineageStartDate(); + case FlowFileSchema.LINEAGE_START_INDEX: + return flowFile.getLineageStartIndex(); + case FlowFileSchema.QUEUE_DATE: + return flowFile.getLastQueueDate(); + case FlowFileSchema.QUEUE_DATE_INDEX: + return flowFile.getQueueDateIndex(); + case FlowFileSchema.CONTENT_CLAIM: + final ContentClaimFieldMap contentClaimFieldMap = record.getCurrentClaim() == null ? null + : new ContentClaimFieldMap(record.getCurrentClaim(), record.getCurrentClaimOffset(), contentClaimSchema); + return contentClaimFieldMap; + case RepositoryRecordSchema.QUEUE_IDENTIFIER: + final FlowFileQueue queue = record.getDestination() == null ? record.getOriginalQueue() : record.getDestination(); + return queue == null ? null : queue.getIdentifier(); + default: + return null; + } + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public String toString() { + return "RepositoryRecordFieldMap[" + record + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java new file mode 100644 index 0000000..db77c8b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import org.apache.nifi.repository.schema.ComplexRecordField; +import org.apache.nifi.repository.schema.FieldType; +import org.apache.nifi.repository.schema.RecordField; +import org.apache.nifi.repository.schema.RecordSchema; +import org.apache.nifi.repository.schema.Repetition; +import org.apache.nifi.repository.schema.SimpleRecordField; +import org.apache.nifi.repository.schema.UnionRecordField; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class RepositoryRecordSchema { + public static final String REPOSITORY_RECORD_UPDATE_V1 = "Repository Record Update"; // top level field name + public static final String REPOSITORY_RECORD_UPDATE_V2 = "Repository Record Update"; // top level field name + + // repository record fields + public static final String ACTION_TYPE = "Action"; + public static final String RECORD_ID = "Record ID"; + public static final String QUEUE_IDENTIFIER = "Queue Identifier"; + public static final String SWAP_LOCATION = "Swap Location"; + + // Update types + public static final String CREATE_OR_UPDATE_ACTION = "Create or Update"; + public static final String DELETE_ACTION = "Delete"; + public static final String SWAP_IN_ACTION = "Swap In"; + public static final String SWAP_OUT_ACTION = "Swap Out"; + + public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V1; + public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V1; + public static final RecordSchema DELETE_SCHEMA_V1; + public static final RecordSchema SWAP_IN_SCHEMA_V1; + public static final RecordSchema SWAP_OUT_SCHEMA_V1; + + public static final RecordSchema REPOSITORY_RECORD_SCHEMA_V2; + public static final RecordSchema CREATE_OR_UPDATE_SCHEMA_V2; + public static final RecordSchema DELETE_SCHEMA_V2; + public static final RecordSchema SWAP_IN_SCHEMA_V2; + public static final RecordSchema SWAP_OUT_SCHEMA_V2; + + public static final RecordField ACTION_TYPE_FIELD = new SimpleRecordField(ACTION_TYPE, FieldType.STRING, Repetition.EXACTLY_ONE); + public static final RecordField RECORD_ID_FIELD = new SimpleRecordField(RECORD_ID, FieldType.LONG, Repetition.EXACTLY_ONE); + + static { + // Fields for "Create" or "Update" records + final List<RecordField> createOrUpdateFields = new ArrayList<>(); + createOrUpdateFields.add(ACTION_TYPE_FIELD); + createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields()); + + createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE)); + final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields); + CREATE_OR_UPDATE_SCHEMA_V1 = new RecordSchema(createOrUpdateFields); + + // Fields for "Delete" records + final List<RecordField> deleteFields = new ArrayList<>(); + deleteFields.add(ACTION_TYPE_FIELD); + deleteFields.add(RECORD_ID_FIELD); + final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields); + DELETE_SCHEMA_V1 = new RecordSchema(deleteFields); + + // Fields for "Swap Out" records + final List<RecordField> swapOutFields = new ArrayList<>(); + swapOutFields.add(ACTION_TYPE_FIELD); + swapOutFields.add(RECORD_ID_FIELD); + swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields); + SWAP_OUT_SCHEMA_V1 = new RecordSchema(swapOutFields); + + // Fields for "Swap In" records + final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields); + swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields); + SWAP_IN_SCHEMA_V1 = new RecordSchema(swapInFields); + + // Union Field that creates the top-level field type + final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V1, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn); + REPOSITORY_RECORD_SCHEMA_V1 = new RecordSchema(Collections.singletonList(repoUpdateField)); + } + + static { + // Fields for "Create" or "Update" records + final List<RecordField> createOrUpdateFields = new ArrayList<>(); + createOrUpdateFields.add(ACTION_TYPE_FIELD); + createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V2.getFields()); + + createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.ZERO_OR_ONE)); + final ComplexRecordField createOrUpdate = new ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, createOrUpdateFields); + CREATE_OR_UPDATE_SCHEMA_V2 = new RecordSchema(createOrUpdateFields); + + // Fields for "Delete" records + final List<RecordField> deleteFields = new ArrayList<>(); + deleteFields.add(ACTION_TYPE_FIELD); + deleteFields.add(RECORD_ID_FIELD); + final ComplexRecordField delete = new ComplexRecordField(DELETE_ACTION, Repetition.EXACTLY_ONE, deleteFields); + DELETE_SCHEMA_V2 = new RecordSchema(deleteFields); + + // Fields for "Swap Out" records + final List<RecordField> swapOutFields = new ArrayList<>(); + swapOutFields.add(ACTION_TYPE_FIELD); + swapOutFields.add(RECORD_ID_FIELD); + swapOutFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); + swapOutFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final ComplexRecordField swapOut = new ComplexRecordField(SWAP_OUT_ACTION, Repetition.EXACTLY_ONE, swapOutFields); + SWAP_OUT_SCHEMA_V2 = new RecordSchema(swapOutFields); + + // Fields for "Swap In" records + final List<RecordField> swapInFields = new ArrayList<>(createOrUpdateFields); + swapInFields.add(new SimpleRecordField(SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); + final ComplexRecordField swapIn = new ComplexRecordField(SWAP_IN_ACTION, Repetition.EXACTLY_ONE, swapInFields); + SWAP_IN_SCHEMA_V2 = new RecordSchema(swapInFields); + + // Union Field that creates the top-level field type + final UnionRecordField repoUpdateField = new UnionRecordField(REPOSITORY_RECORD_UPDATE_V2, Repetition.EXACTLY_ONE, createOrUpdate, delete, swapOut, swapIn); + REPOSITORY_RECORD_SCHEMA_V2 = new RecordSchema(Collections.singletonList(repoUpdateField)); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java new file mode 100644 index 0000000..93fa4e4 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import org.apache.nifi.controller.repository.RepositoryRecordType; +import org.apache.nifi.repository.schema.NamedValue; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; +import org.wali.UpdateType; + +public class RepositoryRecordUpdate implements Record { + private final RecordSchema schema; + private final RepositoryRecordFieldMap fieldMap; + + public RepositoryRecordUpdate(final RepositoryRecordFieldMap fieldMap, final RecordSchema schema) { + this.schema = schema; + this.fieldMap = fieldMap; + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Object getFieldValue(final String fieldName) { + if (RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) { + String actionType = (String) fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE); + if (RepositoryRecordType.CONTENTMISSING.name().equals(actionType)) { + actionType = RepositoryRecordType.DELETE.name(); + } + final UpdateType updateType = UpdateType.valueOf(actionType); + + final String actionName; + switch (updateType) { + case CREATE: + case UPDATE: + actionName = RepositoryRecordSchema.CREATE_OR_UPDATE_ACTION; + break; + case DELETE: + actionName = RepositoryRecordSchema.DELETE_ACTION; + break; + case SWAP_IN: + actionName = RepositoryRecordSchema.SWAP_IN_ACTION; + break; + case SWAP_OUT: + actionName = RepositoryRecordSchema.SWAP_OUT_ACTION; + break; + default: + return null; + } + + return new NamedValue(actionName, fieldMap); + } + return null; + } + +}
