http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java new file mode 100644 index 0000000..afa19ea --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/main/java/org/apache/nifi/controller/repository/schema/ResourceClaimFieldMap.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository.schema; + +import org.apache.nifi.controller.repository.claim.ResourceClaim; +import org.apache.nifi.controller.repository.claim.ResourceClaimManager; +import org.apache.nifi.repository.schema.Record; +import org.apache.nifi.repository.schema.RecordSchema; + +public class ResourceClaimFieldMap implements Record { + private final ResourceClaim resourceClaim; + private final RecordSchema schema; + + public ResourceClaimFieldMap(final ResourceClaim resourceClaim, final RecordSchema schema) { + this.resourceClaim = resourceClaim; + this.schema = schema; + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + public Object getFieldValue(final String fieldName) { + switch (fieldName) { + case ContentClaimSchema.CLAIM_CONTAINER: + return resourceClaim.getContainer(); + case ContentClaimSchema.CLAIM_SECTION: + return resourceClaim.getSection(); + case ContentClaimSchema.CLAIM_IDENTIFIER: + return resourceClaim.getId(); + case ContentClaimSchema.LOSS_TOLERANT: + return resourceClaim.isLossTolerant(); + } + + return null; + } + + public static ResourceClaim getResourceClaim(final Record record, final ResourceClaimManager claimManager) { + final String container = (String) record.getFieldValue(ContentClaimSchema.CLAIM_CONTAINER); + final String section = (String) record.getFieldValue(ContentClaimSchema.CLAIM_SECTION); + final String identifier = (String) record.getFieldValue(ContentClaimSchema.CLAIM_IDENTIFIER); + final Boolean lossTolerant = (Boolean) record.getFieldValue(ContentClaimSchema.LOSS_TOLERANT); + + return claimManager.newResourceClaim(container, section, identifier, lossTolerant, false); + } + + @Override + public int hashCode() { + return 41 + 91 * resourceClaim.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + + if (obj.getClass() != ResourceClaimFieldMap.class) { + return false; + } + + final ResourceClaimFieldMap other = (ResourceClaimFieldMap) obj; + return resourceClaim.equals(other.resourceClaim); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java new file mode 100644 index 0000000..59b0e7b --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-flowfile-repo-serialization/src/test/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerdeTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.queue.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; +import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.nifi.controller.repository.RepositoryRecordType.SWAP_IN; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SchemaRepositoryRecordSerdeTest { + public static final String TEST_QUEUE_IDENTIFIER = "testQueueIdentifier"; + private StandardResourceClaimManager resourceClaimManager; + private SchemaRepositoryRecordSerde schemaRepositoryRecordSerde; + private Map<String, FlowFileQueue> queueMap; + private FlowFileQueue flowFileQueue; + private ByteArrayOutputStream byteArrayOutputStream; + private DataOutputStream dataOutputStream; + + @Before + public void setup() { + resourceClaimManager = new StandardResourceClaimManager(); + schemaRepositoryRecordSerde = new SchemaRepositoryRecordSerde(resourceClaimManager); + queueMap = new HashMap<>(); + schemaRepositoryRecordSerde.setQueueMap(queueMap); + flowFileQueue = createMockQueue(TEST_QUEUE_IDENTIFIER); + byteArrayOutputStream = new ByteArrayOutputStream(); + dataOutputStream = new DataOutputStream(byteArrayOutputStream); + } + + @After + public void teardown() { + resourceClaimManager.purge(); + } + + @Test + public void testV1CreateCantHandleLongAttributeName() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1CreateCantHandleLongAttributeValue() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2CreateCanHandleLongAttributeName() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2CreateCanHandleLongAttributeValue() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testRoundTripCreateV1ToV2() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + attributes.put("testName", "testValue"); + schemaRepositoryRecordSerde.serializeRecord(createCreateFlowFileRecord(attributes), dataOutputStream, + RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1SwapInCantHandleLongAttributeName() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV1SwapInCantHandleLongAttributeValue() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertNotEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2SwapInCanHandleLongAttributeName() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put(stringBuilder.toString(), "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testV2SwapInCanHandleLongAttributeValue() throws IOException { + schemaRepositoryRecordSerde.writeHeader(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + StringBuilder stringBuilder = new StringBuilder(); + for (int i = 0; i < 65536; i++) { + stringBuilder.append('a'); + } + attributes.put("testName", stringBuilder.toString()); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + } + + @Test + public void testRoundTripSwapInV1ToV2() throws IOException { + RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1.writeTo(dataOutputStream); + Map<String, String> attributes = new HashMap<>(); + attributes.put("testName", "testValue"); + StandardRepositoryRecord record = createCreateFlowFileRecord(attributes); + record.setSwapLocation("fake"); + assertEquals(SWAP_IN, record.getType()); + schemaRepositoryRecordSerde.serializeRecord(record, dataOutputStream, RepositoryRecordSchema.SWAP_IN_SCHEMA_V1, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1); + + DataInputStream dataInputStream = createDataInputStream(); + schemaRepositoryRecordSerde.readHeader(dataInputStream); + RepositoryRecord repositoryRecord = schemaRepositoryRecordSerde.deserializeRecord(dataInputStream, 2); + assertEquals(attributes, repositoryRecord.getCurrent().getAttributes()); + assertEquals(SWAP_IN, repositoryRecord.getType()); + } + + private DataInputStream createDataInputStream() throws IOException { + dataOutputStream.flush(); + return new DataInputStream(new ByteArrayInputStream(byteArrayOutputStream.toByteArray())); + } + + private StandardRepositoryRecord createCreateFlowFileRecord(Map<String, String> attributes) { + StandardRepositoryRecord standardRepositoryRecord = new StandardRepositoryRecord(flowFileQueue); + StandardFlowFileRecord.Builder flowFileRecordBuilder = new StandardFlowFileRecord.Builder(); + flowFileRecordBuilder.addAttributes(attributes); + standardRepositoryRecord.setWorking(flowFileRecordBuilder.build()); + return standardRepositoryRecord; + } + + private FlowFileQueue createMockQueue(String identifier) { + FlowFileQueue flowFileQueue = mock(FlowFileQueue.class); + when(flowFileQueue.getIdentifier()).thenReturn(identifier); + queueMap.put(identifier, flowFileQueue); + return flowFileQueue; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml index 54d777f..6395e6e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/pom.xml @@ -57,6 +57,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-repository-models</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> </dependency> <dependency> @@ -136,6 +140,10 @@ <artifactId>nifi-write-ahead-log</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-flowfile-repo-serialization</artifactId> + </dependency> + <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java deleted file mode 100644 index 44ed62d..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerde.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository; - -import java.util.Map; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.wali.SerDe; -import org.wali.UpdateType; - -public abstract class RepositoryRecordSerde implements SerDe<RepositoryRecord> { - private Map<String, FlowFileQueue> flowFileQueueMap = null; - - protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) { - this.flowFileQueueMap = queueMap; - } - - protected Map<String, FlowFileQueue> getQueueMap() { - return flowFileQueueMap; - } - - protected FlowFileQueue getFlowFileQueue(final String queueId) { - return flowFileQueueMap.get(queueId); - } - - @Override - public Long getRecordIdentifier(final RepositoryRecord record) { - return record.getCurrent().getId(); - } - - @Override - public UpdateType getUpdateType(final RepositoryRecord record) { - switch (record.getType()) { - case CONTENTMISSING: - case DELETE: - return UpdateType.DELETE; - case CREATE: - return UpdateType.CREATE; - case UPDATE: - return UpdateType.UPDATE; - case SWAP_OUT: - return UpdateType.SWAP_OUT; - case SWAP_IN: - return UpdateType.SWAP_IN; - } - return null; - } - - @Override - public String getLocation(final RepositoryRecord record) { - return record.getSwapLocation(); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java deleted file mode 100644 index c19fa94..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordSerdeFactory.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository; - -import java.util.Map; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.wali.SerDe; -import org.wali.SerDeFactory; -import org.wali.UpdateType; - -public class RepositoryRecordSerdeFactory implements SerDeFactory<RepositoryRecord> { - private final String LEGACY_SERDE_ENCODING_NAME = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository$WriteAheadRecordSerde"; - private final ResourceClaimManager resourceClaimManager; - private Map<String, FlowFileQueue> flowFileQueueMap = null; - - public RepositoryRecordSerdeFactory(final ResourceClaimManager claimManager) { - this.resourceClaimManager = claimManager; - } - - protected void setQueueMap(final Map<String, FlowFileQueue> queueMap) { - this.flowFileQueueMap = queueMap; - } - - protected Map<String, FlowFileQueue> getQueueMap() { - return flowFileQueueMap; - } - - @Override - public SerDe<RepositoryRecord> createSerDe(final String encodingName) { - if (encodingName == null || SchemaRepositoryRecordSerde.class.getName().equals(encodingName)) { - final SchemaRepositoryRecordSerde serde = new SchemaRepositoryRecordSerde(resourceClaimManager); - serde.setQueueMap(flowFileQueueMap); - return serde; - } - - if (WriteAheadRepositoryRecordSerde.class.getName().equals(encodingName) - || LEGACY_SERDE_ENCODING_NAME.equals(encodingName)) { - final WriteAheadRepositoryRecordSerde serde = new WriteAheadRepositoryRecordSerde(resourceClaimManager); - serde.setQueueMap(flowFileQueueMap); - return serde; - } - - throw new IllegalArgumentException("Cannot create Deserializer for Repository Records because the encoding '" + encodingName + "' is not known"); - } - - protected FlowFileQueue getFlowFileQueue(final String queueId) { - return flowFileQueueMap.get(queueId); - } - - @Override - public Long getRecordIdentifier(final RepositoryRecord record) { - return record.getCurrent().getId(); - } - - @Override - public UpdateType getUpdateType(final RepositoryRecord record) { - switch (record.getType()) { - case CONTENTMISSING: - case DELETE: - return UpdateType.DELETE; - case CREATE: - return UpdateType.CREATE; - case UPDATE: - return UpdateType.UPDATE; - case SWAP_OUT: - return UpdateType.SWAP_OUT; - case SWAP_IN: - return UpdateType.SWAP_IN; - } - return null; - } - - @Override - public String getLocation(final RepositoryRecord record) { - return record.getSwapLocation(); - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java deleted file mode 100644 index 221f8ce..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java +++ /dev/null @@ -1,239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Map; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.schema.ContentClaimFieldMap; -import org.apache.nifi.controller.repository.schema.ContentClaimSchema; -import org.apache.nifi.controller.repository.schema.FlowFileSchema; -import org.apache.nifi.controller.repository.schema.RepositoryRecordFieldMap; -import org.apache.nifi.controller.repository.schema.RepositoryRecordSchema; -import org.apache.nifi.controller.repository.schema.RepositoryRecordUpdate; -import org.apache.nifi.repository.schema.FieldType; -import org.apache.nifi.repository.schema.Record; -import org.apache.nifi.repository.schema.RecordSchema; -import org.apache.nifi.repository.schema.Repetition; -import org.apache.nifi.repository.schema.SchemaRecordReader; -import org.apache.nifi.repository.schema.SchemaRecordWriter; -import org.apache.nifi.repository.schema.SimpleRecordField; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wali.SerDe; -import org.wali.UpdateType; - -public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { - private static final Logger logger = LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class); - private static final int MAX_ENCODING_VERSION = 2; - - private final RecordSchema writeSchema = RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2; - private final RecordSchema contentClaimSchema = ContentClaimSchema.CONTENT_CLAIM_SCHEMA_V1; - - private final ResourceClaimManager resourceClaimManager; - private volatile RecordSchema recoverySchema; - - public SchemaRepositoryRecordSerde(final ResourceClaimManager resourceClaimManager) { - this.resourceClaimManager = resourceClaimManager; - } - - @Override - public void writeHeader(final DataOutputStream out) throws IOException { - writeSchema.writeTo(out); - } - - @Override - public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord newRecordState, final DataOutputStream out) throws IOException { - serializeRecord(newRecordState, out); - } - - @Override - public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { - final RecordSchema schema; - switch (record.getType()) { - case CREATE: - case UPDATE: - schema = RepositoryRecordSchema.CREATE_OR_UPDATE_SCHEMA_V2; - break; - case CONTENTMISSING: - case DELETE: - schema = RepositoryRecordSchema.DELETE_SCHEMA_V2; - break; - case SWAP_IN: - schema = RepositoryRecordSchema.SWAP_IN_SCHEMA_V2; - break; - case SWAP_OUT: - schema = RepositoryRecordSchema.SWAP_OUT_SCHEMA_V2; - break; - default: - throw new IllegalArgumentException("Received Repository Record with unknown Update Type: " + record.getType()); // won't happen. - } - - serializeRecord(record, out, schema, RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V2); - } - - - protected void serializeRecord(final RepositoryRecord record, final DataOutputStream out, RecordSchema schema, RecordSchema repositoryRecordSchema) throws IOException { - final RepositoryRecordFieldMap fieldMap = new RepositoryRecordFieldMap(record, schema, contentClaimSchema); - final RepositoryRecordUpdate update = new RepositoryRecordUpdate(fieldMap, repositoryRecordSchema); - new SchemaRecordWriter().writeRecord(update, out); - } - - @Override - public void readHeader(final DataInputStream in) throws IOException { - recoverySchema = RecordSchema.readFrom(in); - } - - @Override - public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { - return deserializeRecord(in, version); - } - - @Override - public RepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { - final SchemaRecordReader reader = SchemaRecordReader.fromSchema(recoverySchema); - final Record updateRecord = reader.readRecord(in); - if (updateRecord == null) { - // null may be returned by reader.readRecord() if it encounters end-of-stream - return null; - } - - // Top level is always going to be a "Repository Record Update" record because we need a 'Union' type record at the - // top level that indicates which type of record we have. - final Record record = (Record) updateRecord.getFieldValue(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2); - - final String actionType = (String) record.getFieldValue(RepositoryRecordSchema.ACTION_TYPE_FIELD); - final UpdateType updateType = UpdateType.valueOf(actionType); - switch (updateType) { - case CREATE: - return createRecord(record); - case DELETE: - return deleteRecord(record); - case SWAP_IN: - return swapInRecord(record); - case SWAP_OUT: - return swapOutRecord(record); - case UPDATE: - return updateRecord(record); - default: - throw new IOException("Found unrecognized Update Type '" + actionType + "'"); - } - } - - - @SuppressWarnings("unchecked") - private StandardRepositoryRecord createRecord(final Record record) { - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - ffBuilder.id((Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID)); - ffBuilder.entryDate((Long) record.getFieldValue(FlowFileSchema.ENTRY_DATE)); - - final Long lastQueueDate = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE); - final Long queueDateIndex = (Long) record.getFieldValue(FlowFileSchema.QUEUE_DATE_INDEX); - ffBuilder.lastQueued(lastQueueDate, queueDateIndex); - - final Long lineageStartDate = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_DATE); - final Long lineageStartIndex = (Long) record.getFieldValue(FlowFileSchema.LINEAGE_START_INDEX); - ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); - - populateContentClaim(ffBuilder, record); - ffBuilder.size((Long) record.getFieldValue(FlowFileSchema.FLOWFILE_SIZE)); - - ffBuilder.addAttributes((Map<String, String>) record.getFieldValue(FlowFileSchema.ATTRIBUTES)); - - final FlowFileRecord flowFileRecord = ffBuilder.build(); - - final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); - final FlowFileQueue queue = getFlowFileQueue(queueId); - - final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord(queue, flowFileRecord); - requireFlowFileQueue(repoRecord, queueId); - return repoRecord; - } - - private void requireFlowFileQueue(final StandardRepositoryRecord repoRecord, final String queueId) { - if (queueId == null || queueId.trim().isEmpty()) { - logger.warn("{} does not have a Queue associated with it; this record will be discarded", repoRecord.getCurrent()); - repoRecord.markForAbort(); - } else if (repoRecord.getOriginalQueue() == null) { - logger.warn("{} maps to unknown Queue {}; this record will be discarded", repoRecord.getCurrent(), queueId); - repoRecord.markForAbort(); - } - } - - private void populateContentClaim(final StandardFlowFileRecord.Builder ffBuilder, final Record record) { - final Object claimMap = record.getFieldValue(FlowFileSchema.CONTENT_CLAIM); - if (claimMap == null) { - return; - } - - final Record claimRecord = (Record) claimMap; - final ContentClaim contentClaim = ContentClaimFieldMap.getContentClaim(claimRecord, resourceClaimManager); - final Long offset = ContentClaimFieldMap.getContentClaimOffset(claimRecord); - - ffBuilder.contentClaim(contentClaim); - ffBuilder.contentClaimOffset(offset); - } - - private RepositoryRecord updateRecord(final Record record) { - return createRecord(record); - } - - private RepositoryRecord deleteRecord(final Record record) { - final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD); - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); - final FlowFileRecord flowFileRecord = ffBuilder.build(); - - final StandardRepositoryRecord repoRecord = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); - repoRecord.markForDelete(); - return repoRecord; - } - - private RepositoryRecord swapInRecord(final Record record) { - final StandardRepositoryRecord repoRecord = createRecord(record); - final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); - repoRecord.setSwapLocation(swapLocation); - - final String queueId = (String) record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER); - requireFlowFileQueue(repoRecord, queueId); - return repoRecord; - } - - private RepositoryRecord swapOutRecord(final Record record) { - final Long recordId = (Long) record.getFieldValue(RepositoryRecordSchema.RECORD_ID_FIELD); - final String queueId = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.QUEUE_IDENTIFIER, FieldType.STRING, Repetition.EXACTLY_ONE)); - final String swapLocation = (String) record.getFieldValue(new SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, Repetition.EXACTLY_ONE)); - final FlowFileQueue queue = getFlowFileQueue(queueId); - - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .id(recordId) - .build(); - - return new StandardRepositoryRecord(queue, flowFileRecord, swapLocation); - } - - @Override - public int getVersion() { - return MAX_ENCODING_VERSION; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java deleted file mode 100644 index a1d5173..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java +++ /dev/null @@ -1,341 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; - -import org.apache.commons.lang3.builder.CompareToBuilder; -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; - -/** - * <p> - * A flow file is a logical notion of an item in a flow with its associated attributes and identity which can be used as a reference for its actual content. - * </p> - * - * <b>Immutable - Thread Safe</b> - * - */ -public final class StandardFlowFileRecord implements FlowFile, FlowFileRecord { - - private final long id; - private final long entryDate; - private final long lineageStartDate; - private final long lineageStartIndex; - private final long size; - private final long penaltyExpirationMs; - private final Map<String, String> attributes; - private final ContentClaim claim; - private final long claimOffset; - private final long lastQueueDate; - private final long queueDateIndex; - - private StandardFlowFileRecord(final Builder builder) { - this.id = builder.bId; - this.attributes = builder.bAttributes == null ? Collections.emptyMap() : builder.bAttributes; - this.entryDate = builder.bEntryDate; - this.lineageStartDate = builder.bLineageStartDate; - this.lineageStartIndex = builder.bLineageStartIndex; - this.penaltyExpirationMs = builder.bPenaltyExpirationMs; - this.size = builder.bSize; - this.claim = builder.bClaim; - this.claimOffset = builder.bClaimOffset; - this.lastQueueDate = builder.bLastQueueDate; - this.queueDateIndex = builder.bQueueDateIndex; - } - - @Override - public long getId() { - return id; - } - - @Override - public long getEntryDate() { - return entryDate; - } - - @Override - public long getLineageStartDate() { - return lineageStartDate; - } - - @Override - public Long getLastQueueDate() { - return lastQueueDate; - } - - @Override - public boolean isPenalized() { - return penaltyExpirationMs > 0 ? penaltyExpirationMs > System.currentTimeMillis() : false; - } - - @Override - public String getAttribute(final String key) { - return attributes.get(key); - } - - @Override - public long getSize() { - return size; - } - - @Override - public Map<String, String> getAttributes() { - return Collections.unmodifiableMap(this.attributes); - } - - @Override - public ContentClaim getContentClaim() { - return this.claim; - } - - @Override - public long getContentClaimOffset() { - return this.claimOffset; - } - - @Override - public long getLineageStartIndex() { - return lineageStartIndex; - } - - @Override - public long getQueueDateIndex() { - return queueDateIndex; - } - - /** - * Provides the natural ordering for FlowFile objects which is based on their identifier. - * - * @param other other - * @return standard compare contract - */ - @Override - public int compareTo(final FlowFile other) { - return new CompareToBuilder().append(id, other.getId()).toComparison(); - } - - @Override - public boolean equals(final Object other) { - if (this == other) { - return true; - } - if (!(other instanceof FlowFile)) { - return false; - } - final FlowFile otherRecord = (FlowFile) other; - return new EqualsBuilder().append(id, otherRecord.getId()).isEquals(); - } - - @Override - public String toString() { - final ToStringBuilder builder = new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE); - builder.append("uuid", getAttribute(CoreAttributes.UUID.key())); - builder.append("claim", claim == null ? "" : claim.toString()); - builder.append("offset", claimOffset); - builder.append("name", getAttribute(CoreAttributes.FILENAME.key())).append("size", size); - return builder.toString(); - } - - @Override - public int hashCode() { - return new HashCodeBuilder(7, 13).append(id).toHashCode(); - } - - public static final class Builder { - - private long bId; - private long bEntryDate = System.currentTimeMillis(); - private long bLineageStartDate = bEntryDate; - private long bLineageStartIndex = 0L; - private final Set<String> bLineageIdentifiers = new HashSet<>(); - private long bPenaltyExpirationMs = -1L; - private long bSize = 0L; - private ContentClaim bClaim = null; - private long bClaimOffset = 0L; - private long bLastQueueDate = System.currentTimeMillis(); - private long bQueueDateIndex = 0L; - private Map<String, String> bAttributes; - private boolean bAttributesCopied = false; - - public Builder id(final long id) { - bId = id; - return this; - } - - public Builder entryDate(final long epochMs) { - bEntryDate = epochMs; - return this; - } - - public Builder lineageStart(final long lineageStartDate, final long lineageStartIndex) { - bLineageStartDate = lineageStartDate; - bLineageStartIndex = lineageStartIndex; - return this; - } - - public Builder penaltyExpirationTime(final long epochMilliseconds) { - bPenaltyExpirationMs = epochMilliseconds; - return this; - } - - public Builder size(final long bytes) { - if (bytes >= 0) { - bSize = bytes; - } - return this; - } - - private Map<String, String> initializeAttributes() { - if (bAttributes == null) { - bAttributes = new HashMap<>(); - bAttributesCopied = true; - } else if (!bAttributesCopied) { - bAttributes = new HashMap<>(bAttributes); - bAttributesCopied = true; - } - - return bAttributes; - } - - public Builder addAttribute(final String key, final String value) { - if (key != null && value != null) { - initializeAttributes().put(FlowFile.KeyValidator.validateKey(key), value); - } - return this; - } - - public Builder addAttributes(final Map<String, String> attributes) { - final Map<String, String> initializedAttributes = initializeAttributes(); - - if (null != attributes) { - for (final String key : attributes.keySet()) { - FlowFile.KeyValidator.validateKey(key); - } - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - final String key = entry.getKey(); - final String value = entry.getValue(); - if (key != null && value != null) { - initializedAttributes.put(key, value); - } - } - } - return this; - } - - public Builder removeAttributes(final String... keys) { - if (keys != null) { - for (final String key : keys) { - if (CoreAttributes.UUID.key().equals(key)) { - continue; - } - - initializeAttributes().remove(key); - } - } - return this; - } - - public Builder removeAttributes(final Set<String> keys) { - if (keys != null) { - for (final String key : keys) { - if (CoreAttributes.UUID.key().equals(key)) { - continue; - } - - initializeAttributes().remove(key); - } - } - return this; - } - - public Builder removeAttributes(final Pattern keyPattern) { - if (keyPattern != null) { - final Iterator<String> iterator = initializeAttributes().keySet().iterator(); - while (iterator.hasNext()) { - final String key = iterator.next(); - - if (CoreAttributes.UUID.key().equals(key)) { - continue; - } - - if (keyPattern.matcher(key).matches()) { - iterator.remove(); - } - } - } - return this; - } - - public Builder contentClaim(final ContentClaim claim) { - this.bClaim = claim; - return this; - } - - public Builder contentClaimOffset(final long offset) { - this.bClaimOffset = offset; - return this; - } - - public Builder lastQueued(final long lastQueueDate, final long queueDateIndex) { - this.bLastQueueDate = lastQueueDate; - this.bQueueDateIndex = queueDateIndex; - return this; - } - - public Builder fromFlowFile(final FlowFileRecord specFlowFile) { - if (specFlowFile == null) { - return this; - } - bId = specFlowFile.getId(); - bEntryDate = specFlowFile.getEntryDate(); - bLineageStartDate = specFlowFile.getLineageStartDate(); - bLineageStartIndex = specFlowFile.getLineageStartIndex(); - bLineageIdentifiers.clear(); - bPenaltyExpirationMs = specFlowFile.getPenaltyExpirationMillis(); - bSize = specFlowFile.getSize(); - bAttributes = specFlowFile.getAttributes(); - bAttributesCopied = false; - bClaim = specFlowFile.getContentClaim(); - bClaimOffset = specFlowFile.getContentClaimOffset(); - bLastQueueDate = specFlowFile.getLastQueueDate(); - bQueueDateIndex = specFlowFile.getQueueDateIndex(); - - return this; - } - - public FlowFileRecord build() { - return new StandardFlowFileRecord(this); - } - } - - @Override - public long getPenaltyExpirationMillis() { - return penaltyExpirationMs; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java deleted file mode 100644 index 8aa1caf..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.processor.Relationship; - -public class StandardRepositoryRecord implements RepositoryRecord { - - private RepositoryRecordType type = null; - private FlowFileRecord workingFlowFileRecord = null; - private Relationship transferRelationship = null; - private FlowFileQueue destination = null; - private final FlowFileRecord originalFlowFileRecord; - private final FlowFileQueue originalQueue; - private String swapLocation; - private final Map<String, String> updatedAttributes = new HashMap<>(); - private final Map<String, String> originalAttributes; - private List<ContentClaim> transientClaims; - - /** - * Creates a new record which has no original claim or flow file - it is entirely new - * - * @param originalQueue queue - */ - public StandardRepositoryRecord(final FlowFileQueue originalQueue) { - this(originalQueue, null); - this.type = RepositoryRecordType.CREATE; - } - - /** - * Creates a record based on given original items - * - * @param originalQueue queue - * @param originalFlowFileRecord record - */ - public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord) { - this(originalQueue, originalFlowFileRecord, null); - this.type = RepositoryRecordType.UPDATE; - } - - public StandardRepositoryRecord(final FlowFileQueue originalQueue, final FlowFileRecord originalFlowFileRecord, final String swapLocation) { - this.originalQueue = originalQueue; - this.originalFlowFileRecord = originalFlowFileRecord; - this.type = RepositoryRecordType.SWAP_OUT; - this.swapLocation = swapLocation; - this.originalAttributes = originalFlowFileRecord == null ? Collections.<String, String>emptyMap() : originalFlowFileRecord.getAttributes(); - } - - @Override - public FlowFileQueue getDestination() { - return destination; - } - - public void setDestination(final FlowFileQueue destination) { - this.destination = destination; - } - - @Override - public RepositoryRecordType getType() { - return type; - } - - FlowFileRecord getOriginal() { - return originalFlowFileRecord; - } - - @Override - public String getSwapLocation() { - return swapLocation; - } - - public void setSwapLocation(final String swapLocation) { - this.swapLocation = swapLocation; - if (type != RepositoryRecordType.SWAP_OUT) { - type = RepositoryRecordType.SWAP_IN; // we are swapping in a new record - } - } - - @Override - public ContentClaim getOriginalClaim() { - return (originalFlowFileRecord == null) ? null : originalFlowFileRecord.getContentClaim(); - } - - @Override - public FlowFileQueue getOriginalQueue() { - return originalQueue; - } - - public void setWorking(final FlowFileRecord flowFile) { - workingFlowFileRecord = flowFile; - } - - public void setWorking(final FlowFileRecord flowFile, final String attributeKey, final String attributeValue) { - workingFlowFileRecord = flowFile; - - // If setting attribute to same value as original, don't add to updated attributes - final String currentValue = originalAttributes.get(attributeKey); - if (currentValue == null || !currentValue.equals(attributeValue)) { - updatedAttributes.put(attributeKey, attributeValue); - } - } - - public void setWorking(final FlowFileRecord flowFile, final Map<String, String> updatedAttribs) { - workingFlowFileRecord = flowFile; - - for (final Map.Entry<String, String> entry : updatedAttribs.entrySet()) { - final String currentValue = originalAttributes.get(entry.getKey()); - if (currentValue == null || !currentValue.equals(entry.getValue())) { - updatedAttributes.put(entry.getKey(), entry.getValue()); - } - } - } - - @Override - public boolean isAttributesChanged() { - return !updatedAttributes.isEmpty(); - } - - public void markForAbort() { - type = RepositoryRecordType.CONTENTMISSING; - } - - @Override - public boolean isMarkedForAbort() { - return RepositoryRecordType.CONTENTMISSING.equals(type); - } - - public void markForDelete() { - type = RepositoryRecordType.DELETE; - } - - public boolean isMarkedForDelete() { - return RepositoryRecordType.DELETE.equals(type); - } - - public void setTransferRelationship(final Relationship relationship) { - transferRelationship = relationship; - } - - public Relationship getTransferRelationship() { - return transferRelationship; - } - - FlowFileRecord getWorking() { - return workingFlowFileRecord; - } - - ContentClaim getWorkingClaim() { - return (workingFlowFileRecord == null) ? null : workingFlowFileRecord.getContentClaim(); - } - - @Override - public FlowFileRecord getCurrent() { - return (workingFlowFileRecord == null) ? originalFlowFileRecord : workingFlowFileRecord; - } - - @Override - public ContentClaim getCurrentClaim() { - return (getCurrent() == null) ? null : getCurrent().getContentClaim(); - } - - @Override - public long getCurrentClaimOffset() { - return (getCurrent() == null) ? 0L : getCurrent().getContentClaimOffset(); - } - - boolean isWorking() { - return (workingFlowFileRecord != null); - } - - Map<String, String> getOriginalAttributes() { - return originalAttributes; - } - - Map<String, String> getUpdatedAttributes() { - return updatedAttributes; - } - - @Override - public String toString() { - return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" + getCurrent() + "]"; - } - - @Override - public List<ContentClaim> getTransientClaims() { - return transientClaims == null ? Collections.<ContentClaim> emptyList() : Collections.unmodifiableList(transientClaims); - } - - void addTransientClaim(final ContentClaim claim) { - if (claim == null) { - return; - } - - if (transientClaims == null) { - transientClaims = new ArrayList<>(); - } - transientClaims.add(claim); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java deleted file mode 100644 index e8ce44e..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadRepositoryRecordSerde.java +++ /dev/null @@ -1,517 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.nifi.controller.repository; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; - -import org.apache.nifi.controller.queue.FlowFileQueue; -import org.apache.nifi.controller.repository.claim.ContentClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaim; -import org.apache.nifi.controller.repository.claim.ResourceClaimManager; -import org.apache.nifi.controller.repository.claim.StandardContentClaim; -import org.apache.nifi.flowfile.FlowFile; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.wali.SerDe; -import org.wali.UpdateType; - -public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<RepositoryRecord> { - private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class); - - private static final int CURRENT_ENCODING_VERSION = 9; - - public static final byte ACTION_CREATE = 0; - public static final byte ACTION_UPDATE = 1; - public static final byte ACTION_DELETE = 2; - public static final byte ACTION_SWAPPED_OUT = 3; - public static final byte ACTION_SWAPPED_IN = 4; - - private long recordsRestored = 0L; - private final ResourceClaimManager claimManager; - - public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) { - this.claimManager = claimManager; - } - - @Override - public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out) throws IOException { - serializeEdit(previousRecordState, record, out, false); - } - - public void serializeEdit(final RepositoryRecord previousRecordState, final RepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) throws IOException { - if (record.isMarkedForAbort()) { - logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record); - out.write(ACTION_DELETE); - out.writeLong(getRecordIdentifier(record)); - serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); - return; - } - - final UpdateType updateType = getUpdateType(record); - - if (updateType.equals(UpdateType.DELETE)) { - out.write(ACTION_DELETE); - out.writeLong(getRecordIdentifier(record)); - serializeContentClaim(record.getCurrentClaim(), record.getCurrentClaimOffset(), out); - return; - } - - // If there's a Destination Connection, that's the one that we want to associated with this record. - // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection". - // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen, - // so we use the originalConnection instead - FlowFileQueue associatedQueue = record.getDestination(); - if (associatedQueue == null) { - associatedQueue = record.getOriginalQueue(); - } - - if (updateType.equals(UpdateType.SWAP_OUT)) { - out.write(ACTION_SWAPPED_OUT); - out.writeLong(getRecordIdentifier(record)); - out.writeUTF(associatedQueue.getIdentifier()); - out.writeUTF(getLocation(record)); - return; - } - - final FlowFile flowFile = record.getCurrent(); - final ContentClaim claim = record.getCurrentClaim(); - - switch (updateType) { - case UPDATE: - out.write(ACTION_UPDATE); - break; - case CREATE: - out.write(ACTION_CREATE); - break; - case SWAP_IN: - out.write(ACTION_SWAPPED_IN); - break; - default: - throw new AssertionError(); - } - - out.writeLong(getRecordIdentifier(record)); - out.writeLong(flowFile.getEntryDate()); - out.writeLong(flowFile.getLineageStartDate()); - out.writeLong(flowFile.getLineageStartIndex()); - - final Long queueDate = flowFile.getLastQueueDate(); - out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); - out.writeLong(flowFile.getQueueDateIndex()); - out.writeLong(flowFile.getSize()); - - if (associatedQueue == null) { - logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart", - new Object[] {this, record}); - writeString("", out); - } else { - writeString(associatedQueue.getIdentifier(), out); - } - - serializeContentClaim(claim, record.getCurrentClaimOffset(), out); - - if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) { - out.write(1); // indicate attributes changed - final Map<String, String> attributes = flowFile.getAttributes(); - out.writeInt(attributes.size()); - for (final Map.Entry<String, String> entry : attributes.entrySet()) { - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - } else { - out.write(0); // indicate attributes did not change - } - - if (updateType == UpdateType.SWAP_IN) { - out.writeUTF(record.getSwapLocation()); - } - } - - @Override - public RepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, RepositoryRecord> currentRecordStates, final int version) throws IOException { - final int action = in.read(); - final long recordId = in.readLong(); - if (action == ACTION_DELETE) { - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); - - if (version > 4) { - deserializeClaim(in, version, ffBuilder); - } - - final FlowFileRecord flowFileRecord = ffBuilder.build(); - final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); - record.markForDelete(); - - return record; - } - - if (action == ACTION_SWAPPED_OUT) { - final String queueId = in.readUTF(); - final String location = in.readUTF(); - final FlowFileQueue queue = getFlowFileQueue(queueId); - - final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() - .id(recordId) - .build(); - - return new StandardRepositoryRecord(queue, flowFileRecord, location); - } - - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - final RepositoryRecord record = currentRecordStates.get(recordId); - ffBuilder.id(recordId); - if (record != null) { - ffBuilder.fromFlowFile(record.getCurrent()); - } - ffBuilder.entryDate(in.readLong()); - - if (version > 1) { - // read the lineage identifiers and lineage start date, which were added in version 2. - if (version < 9) { - final int numLineageIds = in.readInt(); - for (int i = 0; i < numLineageIds; i++) { - in.readUTF(); //skip identifiers - } - } - final long lineageStartDate = in.readLong(); - final long lineageStartIndex; - if (version > 7) { - lineageStartIndex = in.readLong(); - } else { - lineageStartIndex = 0L; - } - ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); - - if (version > 5) { - final long lastQueueDate = in.readLong(); - final long queueDateIndex; - if (version > 7) { - queueDateIndex = in.readLong(); - } else { - queueDateIndex = 0L; - } - - ffBuilder.lastQueued(lastQueueDate, queueDateIndex); - } - } - - ffBuilder.size(in.readLong()); - final String connectionId = readString(in); - - logger.debug("{} -> {}", new Object[] {recordId, connectionId}); - - deserializeClaim(in, version, ffBuilder); - - // recover new attributes, if they changed - final int attributesChanged = in.read(); - if (attributesChanged == -1) { - throw new EOFException(); - } else if (attributesChanged == 1) { - final int numAttributes = in.readInt(); - final Map<String, String> attributes = new HashMap<>(); - for (int i = 0; i < numAttributes; i++) { - final String key = readString(in); - final String value = readString(in); - attributes.put(key, value); - } - - ffBuilder.addAttributes(attributes); - } else if (attributesChanged != 0) { - throw new IOException("Attribute Change Qualifier not found in stream; found value: " - + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!"); - } - - final FlowFileRecord flowFile = ffBuilder.build(); - String swapLocation = null; - if (action == ACTION_SWAPPED_IN) { - swapLocation = in.readUTF(); - } - - final FlowFileQueue queue = getFlowFileQueue(connectionId); - final StandardRepositoryRecord standardRepoRecord = new StandardRepositoryRecord(queue, flowFile); - if (swapLocation != null) { - standardRepoRecord.setSwapLocation(swapLocation); - } - - if (connectionId.isEmpty()) { - logger.warn("{} does not have a Queue associated with it; this record will be discarded", flowFile); - standardRepoRecord.markForAbort(); - } else if (queue == null) { - logger.warn("{} maps to unknown Queue {}; this record will be discarded", flowFile, connectionId); - standardRepoRecord.markForAbort(); - } - - recordsRestored++; - return standardRepoRecord; - } - - @Override - public StandardRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { - final int action = in.read(); - if (action == -1) { - return null; - } - - final long recordId = in.readLong(); - if (action == ACTION_DELETE) { - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); - - if (version > 4) { - deserializeClaim(in, version, ffBuilder); - } - - final FlowFileRecord flowFileRecord = ffBuilder.build(); - final StandardRepositoryRecord record = new StandardRepositoryRecord((FlowFileQueue) null, flowFileRecord); - record.markForDelete(); - return record; - } - - // if action was not delete, it must be create/swap in - final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - final long entryDate = in.readLong(); - - if (version > 1) { - // read the lineage identifiers and lineage start date, which were added in version 2. - if (version < 9) { - final int numLineageIds = in.readInt(); - for (int i = 0; i < numLineageIds; i++) { - in.readUTF(); //skip identifiers - } - } - - final long lineageStartDate = in.readLong(); - final long lineageStartIndex; - if (version > 7) { - lineageStartIndex = in.readLong(); - } else { - lineageStartIndex = 0L; - } - ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); - - if (version > 5) { - final long lastQueueDate = in.readLong(); - final long queueDateIndex; - if (version > 7) { - queueDateIndex = in.readLong(); - } else { - queueDateIndex = 0L; - } - - ffBuilder.lastQueued(lastQueueDate, queueDateIndex); - } - } - - final long size = in.readLong(); - final String connectionId = readString(in); - - logger.debug("{} -> {}", new Object[] {recordId, connectionId}); - - ffBuilder.id(recordId); - ffBuilder.entryDate(entryDate); - ffBuilder.size(size); - - deserializeClaim(in, version, ffBuilder); - - final int attributesChanged = in.read(); - if (attributesChanged == 1) { - final int numAttributes = in.readInt(); - final Map<String, String> attributes = new HashMap<>(); - for (int i = 0; i < numAttributes; i++) { - final String key = readString(in); - final String value = readString(in); - attributes.put(key, value); - } - - ffBuilder.addAttributes(attributes); - } else if (attributesChanged == -1) { - throw new EOFException(); - } else if (attributesChanged != 0) { - throw new IOException("Attribute Change Qualifier not found in stream; found value: " - + attributesChanged + " after successfully restoring " + recordsRestored + " records"); - } - - final FlowFileRecord flowFile = ffBuilder.build(); - String swapLocation = null; - if (action == ACTION_SWAPPED_IN) { - swapLocation = in.readUTF(); - } - - final StandardRepositoryRecord record; - final FlowFileQueue queue = getFlowFileQueue(connectionId); - record = new StandardRepositoryRecord(queue, flowFile); - if (swapLocation != null) { - record.setSwapLocation(swapLocation); - } - - if (connectionId.isEmpty()) { - logger.warn("{} does not have a FlowFile Queue associated with it; this record will be discarded", flowFile); - record.markForAbort(); - } else if (queue == null) { - logger.warn("{} maps to unknown FlowFile Queue {}; this record will be discarded", flowFile, connectionId); - record.markForAbort(); - } - - recordsRestored++; - return record; - } - - @Override - public void serializeRecord(final RepositoryRecord record, final DataOutputStream out) throws IOException { - serializeEdit(null, record, out, true); - } - - private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException { - if (claim == null) { - out.write(0); - } else { - out.write(1); - - final ResourceClaim resourceClaim = claim.getResourceClaim(); - writeString(resourceClaim.getId(), out); - writeString(resourceClaim.getContainer(), out); - writeString(resourceClaim.getSection(), out); - out.writeLong(claim.getOffset()); - out.writeLong(claim.getLength()); - - out.writeLong(offset); - out.writeBoolean(resourceClaim.isLossTolerant()); - } - } - - private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException { - // determine current Content Claim. - final int claimExists = in.read(); - if (claimExists == 1) { - final String claimId; - if (serializationVersion < 4) { - claimId = String.valueOf(in.readLong()); - } else { - claimId = readString(in); - } - - final String container = readString(in); - final String section = readString(in); - - final long resourceOffset; - final long resourceLength; - if (serializationVersion < 7) { - resourceOffset = 0L; - resourceLength = -1L; - } else { - resourceOffset = in.readLong(); - resourceLength = in.readLong(); - } - - final long claimOffset = in.readLong(); - - final boolean lossTolerant; - if (serializationVersion >= 3) { - lossTolerant = in.readBoolean(); - } else { - lossTolerant = false; - } - - final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); - final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); - contentClaim.setLength(resourceLength); - - ffBuilder.contentClaim(contentClaim); - ffBuilder.contentClaimOffset(claimOffset); - } else if (claimExists == -1) { - throw new EOFException(); - } else if (claimExists != 0) { - throw new IOException("Claim Existence Qualifier not found in stream; found value: " - + claimExists + " after successfully restoring " + recordsRestored + " records"); - } - } - - private void writeString(final String toWrite, final OutputStream out) throws IOException { - final byte[] bytes = toWrite.getBytes("UTF-8"); - final int utflen = bytes.length; - - if (utflen < 65535) { - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } else { - out.write(255); - out.write(255); - out.write(utflen >>> 24); - out.write(utflen >>> 16); - out.write(utflen >>> 8); - out.write(utflen); - out.write(bytes); - } - } - - private String readString(final InputStream in) throws IOException { - final Integer numBytes = readFieldLength(in); - if (numBytes == null) { - throw new EOFException(); - } - final byte[] bytes = new byte[numBytes]; - fillBuffer(in, bytes, numBytes); - return new String(bytes, "UTF-8"); - } - - private Integer readFieldLength(final InputStream in) throws IOException { - final int firstValue = in.read(); - final int secondValue = in.read(); - if (firstValue < 0) { - return null; - } - if (secondValue < 0) { - throw new EOFException(); - } - if (firstValue == 0xff && secondValue == 0xff) { - final int ch1 = in.read(); - final int ch2 = in.read(); - final int ch3 = in.read(); - final int ch4 = in.read(); - if ((ch1 | ch2 | ch3 | ch4) < 0) { - throw new EOFException(); - } - return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; - } else { - return (firstValue << 8) + secondValue; - } - } - - private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { - int bytesRead; - int totalBytesRead = 0; - while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { - totalBytesRead += bytesRead; - } - if (totalBytesRead != length) { - throw new EOFException(); - } - } - - @Override - public int getVersion() { - return CURRENT_ENCODING_VERSION; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/0207f21c/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java deleted file mode 100644 index 39a2591..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.repository.claim; - - -/** - * <p> - * A ContentClaim is a reference to a given flow file's content. Multiple flow files may reference the same content by both having the same content claim.</p> - * - * <p> - * Must be thread safe</p> - * - */ -public final class StandardContentClaim implements ContentClaim, Comparable<ContentClaim> { - - private final ResourceClaim resourceClaim; - private final long offset; - private volatile long length; - - public StandardContentClaim(final ResourceClaim resourceClaim, final long offset) { - this.resourceClaim = resourceClaim; - this.offset = offset; - this.length = -1L; - } - - public void setLength(final long length) { - this.length = length; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result; - result = prime * result + (int) (offset ^ offset >>> 32); - result = prime * result + (resourceClaim == null ? 0 : resourceClaim.hashCode()); - return result; - } - - @Override - public boolean equals(final Object obj) { - if (this == obj) { - return true; - } - - if (obj == null) { - return false; - } - - if (!(obj instanceof ContentClaim)) { - return false; - } - - final ContentClaim other = (ContentClaim) obj; - if (offset != other.getOffset()) { - return false; - } - - return resourceClaim.equals(other.getResourceClaim()); - } - - @Override - public int compareTo(final ContentClaim o) { - final int resourceComp = resourceClaim.compareTo(o.getResourceClaim()); - if (resourceComp != 0) { - return resourceComp; - } - - return Long.compare(offset, o.getOffset()); - } - - @Override - public ResourceClaim getResourceClaim() { - return resourceClaim; - } - - @Override - public long getOffset() { - return offset; - } - - @Override - public long getLength() { - return length; - } - - @Override - public String toString() { - return "StandardContentClaim [resourceClaim=" + resourceClaim + ", offset=" + offset + ", length=" + length + "]"; - } -}
