Repository: nifi Updated Branches: refs/heads/master 2595d816c -> 067e9dfeb
NIFI-4003: Expose configuration option for cache size and duration NIFI-4003: Addressed remaining spots where client does not cache information This closes #1879. Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/067e9dfe Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/067e9dfe Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/067e9dfe Branch: refs/heads/master Commit: 067e9dfeb0a7830a13003a04e4cfc3fd800f019b Parents: 2595d81 Author: Mark Payne <[email protected]> Authored: Thu Jun 1 13:29:29 2017 -0400 Committer: Bryan Bende <[email protected]> Committed: Thu Jun 1 15:31:35 2017 -0400 ---------------------------------------------------------------------- .../nifi-hwx-schema-registry-service/pom.xml | 20 +++ .../hortonworks/HortonworksSchemaRegistry.java | 168 ++++++++++------- .../TestHortonworksSchemaRegistry.java | 180 +++++++++++++++++++ .../src/test/resources/empty-schema.avsc | 6 + 4 files changed, 314 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml index 574831b..79dbc84 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -196,5 +196,25 @@ limitations under the License. <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes combine.children="append"> + <exclude>src/test/resources/empty-schema.avsc</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index a83327d..d2289a2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -16,9 +16,7 @@ */ package org.apache.nifi.schemaregistry.hortonworks; -import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; @@ -64,15 +62,18 @@ import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionCache = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>(); + private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>(); private static final String LOGICAL_TYPE_DATE = "date"; private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; - private static final long VERSION_INFO_CACHE_NANOS = TimeUnit.MINUTES.toNanos(1L); + + private volatile long versionInfoCacheNanos; static final PropertyDescriptor URL = new PropertyDescriptor.Builder() .name("url") @@ -83,33 +84,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .required(true) .build(); + static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder() + .name("cache-size") + .displayName("Cache Size") + .description("Specifies how many Schemas should be cached from the Hortonworks Schema Registry") + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .defaultValue("1000") + .required(true) + .build(); + + static final PropertyDescriptor CACHE_EXPIRATION = new PropertyDescriptor.Builder() + .name("cache-expiration") + .displayName("Cache Expiration") + .description("Specifies how long a Schema that is cached should remain in the cache. Once this time period elapses, a " + + "cached version of a schema will no longer be used, and the service will have to communicate with the " + + "Hortonworks Schema Registry again in order to obtain the schema.") + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .defaultValue("1 hour") + .required(true) + .build(); - private static final List<PropertyDescriptor> propertyDescriptors = Collections.singletonList(URL); private volatile SchemaRegistryClient schemaRegistryClient; private volatile boolean initialized; private volatile Map<String, Object> schemaRegistryConfig; - public HortonworksSchemaRegistry() { - } - @OnEnabled public void enable(final ConfigurationContext context) throws InitializationException { schemaRegistryConfig = new HashMap<>(); + versionInfoCacheNanos = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS); + // The below properties may or may not need to be exposed to the end // user. We just need to watch usage patterns to see if sensible default // can satisfy NiFi requirements String urlValue = context.getProperty(URL).evaluateAttributeExpressions().getValue(); - if (urlValue == null || urlValue.trim().length() == 0){ - throw new IllegalArgumentException("'Schema Registry URL' must not be nul or empty."); + if (urlValue == null || urlValue.trim().isEmpty()) { + throw new IllegalArgumentException("'Schema Registry URL' must not be null or empty."); } schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), urlValue); schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name(), 10L); - schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), 5000L); - schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), 1000L); - schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), 60 * 60 * 1000L); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS)); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name(), context.getProperty(CACHE_SIZE).asInteger()); + schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name(), context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.SECONDS)); } @@ -126,11 +144,15 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return propertyDescriptors; + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(URL); + properties.add(CACHE_SIZE); + properties.add(CACHE_EXPIRATION); + return properties; } - private synchronized SchemaRegistryClient getClient() { + protected synchronized SchemaRegistryClient getClient() { if (!initialized) { schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig); initialized = true; @@ -142,14 +164,14 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { try { // Try to fetch the SchemaVersionInfo from the cache. - final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionCache.get(schemaName); + final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(schemaName); // Determine if the timestampedVersionInfo is expired boolean fetch = false; if (timestampedVersionInfo == null) { fetch = true; } else { - final long minTimestamp = System.nanoTime() - VERSION_INFO_CACHE_NANOS; + final long minTimestamp = System.nanoTime() - versionInfoCacheNanos; fetch = timestampedVersionInfo.getValue() < minTimestamp; } @@ -166,7 +188,41 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme // Store new version in cache. final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime()); - schemaVersionCache.put(schemaName, tuple); + schemaVersionByNameCache.put(schemaName, tuple); + return versionInfo; + } catch (final SchemaNotFoundException e) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + } + } + + private SchemaVersionInfo getSchemaVersionInfo(final SchemaRegistryClient client, final SchemaVersionKey key) throws org.apache.nifi.schema.access.SchemaNotFoundException { + try { + // Try to fetch the SchemaVersionInfo from the cache. + final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByKeyCache.get(key); + + // Determine if the timestampedVersionInfo is expired + boolean fetch = false; + if (timestampedVersionInfo == null) { + fetch = true; + } else { + final long minTimestamp = System.nanoTime() - versionInfoCacheNanos; + fetch = timestampedVersionInfo.getValue() < minTimestamp; + } + + // If not expired, use what we got from the cache + if (!fetch) { + return timestampedVersionInfo.getKey(); + } + + // schema version info was expired or not found in cache. Fetch from schema registry + final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(key); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + key.getSchemaName() + "' and version " + key.getVersion()); + } + + // Store new version in cache. + final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime()); + schemaVersionByKeyCache.put(key, tuple); return versionInfo; } catch (final SchemaNotFoundException e) { throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); @@ -211,58 +267,50 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override - public String retrieveSchemaText(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { - try { - final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); + public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException { + final SchemaRegistryClient client = getClient(); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } + final SchemaMetadata metadata = info.getSchemaMetadata(); + final String schemaName = metadata.getName(); - return versionInfo.getSchemaText(); - } catch (final SchemaNotFoundException e) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); } + + return versionInfo.getSchemaText(); } @Override - public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { - try { - final SchemaRegistryClient client = getClient(); - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } + public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException { + final SchemaRegistryClient client = getClient(); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + if (info == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); + final SchemaMetadata metadata = info.getSchemaMetadata(); + final String schemaName = metadata.getName(); - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = client.getSchemaVersionInfo(schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + if (versionInfo == null) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } - final String schemaText = versionInfo.getSchemaText(); + final String schemaText = versionInfo.getSchemaText(); - final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); - final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); - return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { - final Schema schema = new Schema.Parser().parse(schemaText); - return createRecordSchema(schema, schemaText, schemaIdentifier); - }); - } catch (final SchemaNotFoundException e) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); - } + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); + final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); + return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { + final Schema schema = new Schema.Parser().parse(schemaText); + return createRecordSchema(schema, schemaText, schemaIdentifier); + }); } http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java new file mode 100644 index 0000000..40f2ba7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.schemaregistry.hortonworks; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; + +import java.lang.reflect.Constructor; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.util.MockConfigurationContext; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.hortonworks.registries.schemaregistry.SchemaCompatibility; +import com.hortonworks.registries.schemaregistry.SchemaMetadata; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; + +public class TestHortonworksSchemaRegistry { + private HortonworksSchemaRegistry registry; + private SchemaRegistryClient client; + + private final Map<String, SchemaVersionInfo> schemaVersionInfoMap = new HashMap<>(); + private final Map<String, SchemaMetadataInfo> schemaMetadataInfoMap = new HashMap<>(); + + @Before + public void setup() throws SchemaNotFoundException { + schemaVersionInfoMap.clear(); + schemaMetadataInfoMap.clear(); + + client = mock(SchemaRegistryClient.class); + doAnswer(new Answer<SchemaVersionInfo>() { + @Override + public SchemaVersionInfo answer(final InvocationOnMock invocation) throws Throwable { + final String schemaName = invocation.getArgumentAt(0, String.class); + final SchemaVersionInfo info = schemaVersionInfoMap.get(schemaName); + + if (info == null) { + throw new SchemaNotFoundException(); + } + + return info; + } + }).when(client).getLatestSchemaVersionInfo(any(String.class)); + + doAnswer(new Answer<SchemaMetadataInfo>() { + @Override + public SchemaMetadataInfo answer(InvocationOnMock invocation) throws Throwable { + final String schemaName = invocation.getArgumentAt(0, String.class); + final SchemaMetadataInfo info = schemaMetadataInfoMap.get(schemaName); + + if (info == null) { + throw new SchemaNotFoundException(); + } + + return info; + } + }).when(client).getSchemaMetadataInfo(any(String.class)); + + registry = new HortonworksSchemaRegistry() { + @Override + protected synchronized SchemaRegistryClient getClient() { + return client; + } + }; + } + + @Test + public void testCacheUsed() throws Exception { + final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc"))); + final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description"); + schemaVersionInfoMap.put("unit-test", info); + + final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test") + .compatibility(SchemaCompatibility.NONE) + .evolve(true) + .schemaGroup("group") + .type("AVRO") + .build(); + + final Constructor<SchemaMetadataInfo> ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class); + ctr.setAccessible(true); + + final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis()); + + schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444"); + properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "5 mins"); + properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000"); + + final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null); + registry.enable(configurationContext); + + for (int i = 0; i < 10000; i++) { + final RecordSchema schema = registry.retrieveSchema("unit-test"); + assertNotNull(schema); + } + + Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class)); + } + + @Test + @Ignore("This can be useful for manual testing/debugging, but will keep ignored for now because we don't want automated builds to run this, since it depends on timing") + public void testCacheExpires() throws Exception { + final String text = new String(Files.readAllBytes(Paths.get("src/test/resources/empty-schema.avsc"))); + final SchemaVersionInfo info = new SchemaVersionInfo(1, text, 2L, "description"); + schemaVersionInfoMap.put("unit-test", info); + + final SchemaMetadata metadata = new SchemaMetadata.Builder("unit-test") + .compatibility(SchemaCompatibility.NONE) + .evolve(true) + .schemaGroup("group") + .type("AVRO") + .build(); + + final Constructor<SchemaMetadataInfo> ctr = SchemaMetadataInfo.class.getDeclaredConstructor(SchemaMetadata.class, Long.class, Long.class); + ctr.setAccessible(true); + + final SchemaMetadataInfo schemaMetadataInfo = ctr.newInstance(metadata, 1L, System.currentTimeMillis()); + + schemaMetadataInfoMap.put("unit-test", schemaMetadataInfo); + + final Map<PropertyDescriptor, String> properties = new HashMap<>(); + properties.put(HortonworksSchemaRegistry.URL, "http://localhost:44444"); + properties.put(HortonworksSchemaRegistry.CACHE_EXPIRATION, "1 sec"); + properties.put(HortonworksSchemaRegistry.CACHE_SIZE, "1000"); + + final ConfigurationContext configurationContext = new MockConfigurationContext(properties, null); + registry.enable(configurationContext); + + for (int i = 0; i < 2; i++) { + final RecordSchema schema = registry.retrieveSchema("unit-test"); + assertNotNull(schema); + } + + Mockito.verify(client, Mockito.times(1)).getLatestSchemaVersionInfo(any(String.class)); + + Thread.sleep(2000L); + + for (int i = 0; i < 2; i++) { + final RecordSchema schema = registry.retrieveSchema("unit-test"); + assertNotNull(schema); + } + + Mockito.verify(client, Mockito.times(2)).getLatestSchemaVersionInfo(any(String.class)); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/067e9dfe/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc new file mode 100644 index 0000000..67098d0 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/resources/empty-schema.avsc @@ -0,0 +1,6 @@ +{ + "name": "unitTest", + "namespace": "org.apache.nifi", + "type": "record", + "fields": [] +} \ No newline at end of file
