This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-6973 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 18ccf564e1bb34dba891c75681474f572a35fcbf Author: zhouxh <[email protected]> AuthorDate: Tue Sep 10 16:47:12 2019 -0700 GEODE-6973: Use cachelistener to synchronize typeToId with IdToType Co-authored-by: Xiaojian Zhou <[email protected]> Co-authored-by: Donal Evans <[email protected]> --- .../geode/pdx/PdxTypeGenerationDUnitTest.java | 234 +++++++++++++++++++++ .../org/apache/geode/pdx/jsonStrings/testJSON.txt | 43 ++++ .../apache/geode/pdx/JSONFormatterJUnitTest.java | 26 ++- .../PeerTypeRegistrationIntegrationTest.java | 92 ++++++++ .../geode/internal/pdx/jsonStrings/testJSON.txt | 43 ++++ .../geode/pdx/internal/PeerTypeRegistration.java | 166 +++++++++++---- .../pdx/internal/PeerTypeRegistrationTest.java | 4 + 7 files changed, 556 insertions(+), 52 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/pdx/PdxTypeGenerationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/pdx/PdxTypeGenerationDUnitTest.java new file mode 100644 index 0000000..457d825 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/pdx/PdxTypeGenerationDUnitTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.pdx; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.Properties; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import util.TestException; + +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.pdx.internal.PeerTypeRegistration; +import org.apache.geode.test.dunit.AsyncInvocation; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +public class PdxTypeGenerationDUnitTest { + + private MemberVM locator, server1, server2; + private static final int numOfTypes = 15; + private static final int numOfEnums = 10; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(); + + @Before + public void before() { + Properties props = new Properties(); + props.setProperty("log-level", "WARN"); + + locator = cluster.startLocatorVM(0, props); + + int locatorPort1 = locator.getPort(); + server1 = cluster.startServerVM(1, + x -> x.withProperties(props).withConnectionToLocator(locatorPort1)); + + int locatorPort2 = locator.getPort(); + server2 = cluster.startServerVM(2, + x -> x.withProperties(props).withConnectionToLocator(locatorPort2)); + } + + @Test + public void testLocalMapsRecoveredAfterServerRestart() { + createPdxOnServer(server1, numOfTypes, numOfEnums); + + server2.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + PeerTypeRegistration registration = + (PeerTypeRegistration) (cache.getPdxRegistry().getTypeRegistration()); + + assertThat(registration.getLocalSize()).isEqualTo(numOfTypes + numOfEnums); + assertThat(registration.getTypeToIdSize()).isEqualTo(0); + assertThat(registration.getEnumToIdSize()).isEqualTo(0); + + }); + + server2.stop(false); + Properties props = new Properties(); + props.setProperty("log-level", "WARN"); + int locatorPort1 = locator.getPort(); + server2 = cluster.startServerVM(2, + x -> x.withProperties(props).withConnectionToLocator(locatorPort1)); + + server2.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + PeerTypeRegistration registration = + (PeerTypeRegistration) (cache.getPdxRegistry().getTypeRegistration()); + + assertThat(registration.getLocalSize()).isEqualTo(numOfTypes + numOfEnums); + assertThat(registration.getTypeToIdSize()).isEqualTo(numOfTypes); + assertThat(registration.getEnumToIdSize()).isEqualTo(numOfEnums); + }); + } + + @Test + public void definingNewTypeUpdatesLocalMaps() { + createPdxOnServer(server1, numOfTypes, numOfEnums); + + server2.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + PeerTypeRegistration registration = + (PeerTypeRegistration) (cache.getPdxRegistry().getTypeRegistration()); + + assertThat(registration.getLocalSize()).isEqualTo(numOfTypes + numOfEnums); + assertThat(registration.getTypeToIdSize()).isEqualTo(0); + assertThat(registration.getEnumToIdSize()).isEqualTo(0); + + // Creating a new PdxType to trigger the pending local maps to be flushed + JSONFormatter.fromJSON("{\"fieldName\": \"value\"}"); + + assertThat(registration.getLocalSize()).isEqualTo(numOfTypes + numOfEnums + 1); + assertThat(registration.getTypeToIdSize()).isEqualTo(numOfTypes + 1); + assertThat(registration.getEnumToIdSize()).isEqualTo(numOfEnums); + }); + } + + @Test + public void testNoConflictsWhenGeneratingPdxTypesFromJSONOnMultipleServers() { + int repeats = 10000; + + AsyncInvocation invocation1 = server1.invokeAsync(() -> { + for (int i = 0; i < repeats; ++i) { + JSONFormatter.fromJSON("{\"counter" + i + "\": " + i + "}"); + } + }); + AsyncInvocation invocation2 = server2.invokeAsync(() -> { + for (int i = 0; i < repeats; ++i) { + JSONFormatter.fromJSON("{\"counter" + i + "\": " + i + "}"); + } + }); + + try { + invocation1.await(); + invocation2.await(); + } catch (Exception ex) { + throw new TestException("Exception while awaiting async invocation: " + ex); + } + + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + int numberOfTypesInRegion = cache.getPdxRegistry().getTypeRegistration().getLocalSize(); + int numberOfTypesInLocalMap = + ((PeerTypeRegistration) cache.getPdxRegistry().getTypeRegistration()).getTypeToIdSize(); + + assertThat(numberOfTypesInRegion) + .withFailMessage("Expected number of PdxTypes in region to be %s but was %s", + repeats, numberOfTypesInRegion) + .isEqualTo(repeats); + + assertThat(numberOfTypesInLocalMap) + .withFailMessage("Expected number of PdxTypes in local map to be %s but was %s", + repeats, numberOfTypesInLocalMap) + .isEqualTo(repeats); + }); + + server2.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + int numberOfTypesInRegion = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH).size(); + int numberOfTypesInLocalMap = + ((PeerTypeRegistration) cache.getPdxRegistry().getTypeRegistration()).getTypeToIdSize(); + + assertThat(numberOfTypesInRegion) + .withFailMessage("Expected number of PdxTypes in region to be %s but was %s", + repeats, numberOfTypesInRegion) + .isEqualTo(repeats); + + assertThat(numberOfTypesInLocalMap) + .withFailMessage("Expected number of PdxTypes in local map to be %s but was %s", + repeats, numberOfTypesInLocalMap) + .isEqualTo(repeats); + }); + } + + @Test + public void testEnumsAndPdxTypesCreatedOnClientAreEnteredIntoTypeRegistry() throws Exception { + final String regionName = "regionName"; + server1.invoke(() -> { + ClusterStartupRule.getCache().createRegionFactory().setDataPolicy( + DataPolicy.REPLICATE).create(regionName); + }); + server2.invoke(() -> { + ClusterStartupRule.getCache().createRegionFactory().setDataPolicy( + DataPolicy.REPLICATE).create(regionName); + }); + int port = locator.getPort(); + + Properties props = new Properties(); + props.setProperty("log-level", "WARN"); + ClientVM client = cluster.startClientVM(3, + cf -> cf.withLocatorConnection(port).withPoolSubscription(true).withProperties(props)); + + client.invoke(() -> { + ClientCache cache = ClusterStartupRule.getClientCache(); + cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName); + + for (int i = 0; i < numOfTypes; ++i) { + JSONFormatter.fromJSON("{\"counter" + i + "\": " + i + "}"); + } + for (int i = 0; i < numOfEnums; ++i) { + cache.createPdxEnum("ClassName", "EnumName" + i, i); + } + }); + + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + int numberOfTypesInRegion = cache.getPdxRegistry().getTypeRegistration().getLocalSize(); + + assertThat(numberOfTypesInRegion) + .withFailMessage("Expected number of PdxTypes and Enums in region to be %s but was %s", + numOfEnums, numberOfTypesInRegion) + .isEqualTo(numOfTypes + numOfEnums); + }); + } + + private void createPdxOnServer(MemberVM server, int numOfTypes, int numOfEnums) { + server.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + + for (int i = 0; i < numOfTypes; ++i) { + JSONFormatter.fromJSON("{\"counter" + i + "\": " + i + "}"); + } + for (int i = 0; i < numOfEnums; ++i) { + cache.createPdxEnum("ClassName", "EnumName" + i, i); + } + PeerTypeRegistration registration = + (PeerTypeRegistration) (cache.getPdxRegistry().getTypeRegistration()); + + assertThat(registration.getLocalSize()).isEqualTo(numOfTypes + numOfEnums); + assertThat(registration.getTypeToIdSize()).isEqualTo(numOfTypes); + assertThat(registration.getEnumToIdSize()).isEqualTo(numOfEnums); + }); + } + +} diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/pdx/jsonStrings/testJSON.txt b/geode-core/src/distributedTest/resources/org/apache/geode/pdx/jsonStrings/testJSON.txt new file mode 100644 index 0000000..735fa70 --- /dev/null +++ b/geode-core/src/distributedTest/resources/org/apache/geode/pdx/jsonStrings/testJSON.txt @@ -0,0 +1,43 @@ +{ + "configGlossary:installationAt": "Philadelphia, PA", + "configGlossary:adminEmail": "[email protected]", + "configGlossary:poweredBy": "Cofax", + "configGlossary:poweredByIcon": "/images/cofax.gif", + "configGlossary:staticPath": "/content/static", + "templateProcessorClass": "org.cofax.WysiwygTemplate", + "templateLoaderClass": "org.cofax.FilesTemplateLoader", + "templatePath": "templates", + "templateOverridePath": "", + "defaultListTemplate": "listTemplate.htm", + "defaultFileTemplate": "articleTemplate.htm", + "useJSP": false, + "jspListTemplate": "listTemplate.jsp", + "jspFileTemplate": "articleTemplate.jsp", + "cachePackageTagsTrack": 200, + "cachePackageTagsStore": 200, + "cachePackageTagsRefresh": 60, + "cacheTemplatesTrack": 100, + "cacheTemplatesStore": 50, + "cacheTemplatesRefresh": 15, + "cachePagesTrack": 200, + "cachePagesStore": 100, + "cachePagesRefresh": 10, + "cachePagesDirtyRead": 10, + "searchEngineListTemplate": "forSearchEnginesList.htm", + "searchEngineFileTemplate": "forSearchEngines.htm", + "searchEngineRobotsDb": "WEB-INF/robots.db", + "useDataStore": true, + "dataStoreClass": "org.cofax.SqlDataStore", + "redirectionClass": "org.cofax.SqlRedirection", + "dataStoreName": "cofax", + "dataStoreDriver": "com.microsoft.jdbc.sqlserver.SQLServerDriver", + "dataStoreUrl": "jdbc:microsoft:sqlserver://LOCALHOST:1433;DatabaseName=goon", + "dataStoreUser": "sa", + "dataStorePassword": "dataStoreTestQuery", + "dataStoreTestQuery": "SET NOCOUNT ON;select test='test';", + "dataStoreLogFile": "/usr/local/tomcat/logs/datastore.log", + "dataStoreInitConns": 10, + "dataStoreMaxConns": 100, + "dataStoreConnUsageLimit": 100, + "dataStoreLogLevel": "debug", + "maxUrlLength": 500} diff --git a/geode-core/src/integrationTest/java/org/apache/geode/pdx/JSONFormatterJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/pdx/JSONFormatterJUnitTest.java index 0995fb8..222fac7 100755 --- a/geode-core/src/integrationTest/java/org/apache/geode/pdx/JSONFormatterJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/pdx/JSONFormatterJUnitTest.java @@ -39,14 +39,18 @@ import org.apache.geode.test.junit.categories.SerializationTest; @Category({SerializationTest.class}) public class JSONFormatterJUnitTest { public static final String REGION_NAME = "primitiveKVStore"; + private Cache cache; + private Region<Object, Object> region; @Before public void setUp() throws Exception { - this.cache = new CacheFactory().set(MCAST_PORT, "0").setPdxReadSerialized(true).create(); + this.cache = new CacheFactory().set(MCAST_PORT, "0") + .set("log-level", "WARN").setPdxReadSerialized(true).create(); - region = cache.createRegionFactory().setDataPolicy(DataPolicy.PARTITION).create(REGION_NAME); + region = cache.createRegionFactory().setDataPolicy(DataPolicy.PARTITION) + .create(REGION_NAME); } @@ -89,7 +93,8 @@ public class JSONFormatterJUnitTest { // 2. Get the JSON string from actualTestObject using jackson ObjectMapper. ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setDateFormat(new SimpleDateFormat("MM/dd/yyyy")); - objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + objectMapper + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); String json = objectMapper.writeValueAsString(expectedTestObject); @@ -111,14 +116,16 @@ public class JSONFormatterJUnitTest { } @Test - public void verifyJsonToPdxInstanceConversionWithJSONFormatter() throws Exception { + public void verifyJsonToPdxInstanceConversionWithJSONFormatter() + throws Exception { TestObjectForJSONFormatter expectedTestObject = new TestObjectForJSONFormatter(); expectedTestObject.defaultInitialization(); // 1.gets pdxInstance using R.put() and R.get() region.put("501", expectedTestObject); Object receivedObject = region.get("501"); - assertEquals("receivedObject is expected to be of type PdxInstance", PdxInstanceImpl.class, + assertEquals("receivedObject is expected to be of type PdxInstance", + PdxInstanceImpl.class, receivedObject.getClass()); PdxInstance expectedPI = (PdxInstance) receivedObject; @@ -140,7 +147,8 @@ public class JSONFormatterJUnitTest { assertEquals("receivedObject is expected to be of type PdxInstance", TestObjectForJSONFormatter.class, actualTestObject.getClass()); - assertEquals("actualTestObject and expectedTestObject should be equal", expectedTestObject, + assertEquals("actualTestObject and expectedTestObject should be equal", + expectedTestObject, actualTestObject); } @@ -153,7 +161,8 @@ public class JSONFormatterJUnitTest { int pdxTypes = 0; if (cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH) != null) { - pdxTypes = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH).keySet().size(); + pdxTypes = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH).keySet() + .size(); } String js = "{name:\"ValueExist\", age:14}"; @@ -197,7 +206,8 @@ public class JSONFormatterJUnitTest { int pdxTypes = 0; if (cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH) != null) { - pdxTypes = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH).keySet().size(); + pdxTypes = cache.getRegion(PeerTypeRegistration.REGION_FULL_PATH) + .keySet().size(); } String js2 = "{c:\"c' go\", bb:23, b:\"b\", age:14 }"; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/pdx/internal/PeerTypeRegistrationIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/pdx/internal/PeerTypeRegistrationIntegrationTest.java new file mode 100644 index 0000000..680e723 --- /dev/null +++ b/geode-core/src/integrationTest/java/org/apache/geode/pdx/internal/PeerTypeRegistrationIntegrationTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.pdx.internal; + +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.test.junit.categories.SerializationTest; + +@Category({SerializationTest.class}) +public class PeerTypeRegistrationIntegrationTest { + private Cache cache; + private PeerTypeRegistration registration; + + @Before + public void setUp() { + cache = new CacheFactory().set(MCAST_PORT, "0") + .set("log-level", "WARN").setPdxReadSerialized(true).create(); + registration = new PeerTypeRegistration((InternalCache) cache); + registration.initialize(); + } + + @After + public void tearDown() { + try { + if (!cache.isClosed()) { + cache.close(); + } + } catch (Exception e) { + throw new AssertionError(e); + } + } + + @Test + public void testDefineType() { + + PdxType firstType = new PdxType(); + firstType.setClassName("Mock.Test.Class.One"); + + PdxType secondType = new PdxType(); + secondType.setClassName("Mock.Test.Class.Two"); + + assertThat(registration.getLocalSize()).isEqualTo(0); + assertThat(registration.getTypeToIdSize()).isEqualTo(0); + + int firstTypeId1 = registration.defineType(firstType); + + // Confirm the PdxType was added to the region and the local map + assertThat(registration.getLocalSize()).isEqualTo(1); + assertThat(registration.getTypeToIdSize()).isEqualTo(1); + + firstType.setTypeId(firstTypeId1 - 1); + int firstTypeId2 = registration.defineType(firstType); + + // Defining an existing type with a different TypeId returns the existing TypeId + assertThat(firstTypeId1).isEqualTo(firstTypeId2); + assertThat(registration.getType(firstTypeId2)).isEqualTo(firstType); + + // Defining an existing type does not add a new type to the region or local map + assertThat(registration.getLocalSize()).isEqualTo(1); + assertThat(registration.getTypeToIdSize()).isEqualTo(1); + + secondType.setTypeId(firstTypeId1); + + int secondTypeId = registration.defineType(secondType); + + // Defining a new type with an existing TypeId does not overwrite the existing type + assertThat(secondTypeId).isNotEqualTo(firstTypeId1); + assertThat(registration.getLocalSize()).isEqualTo(2); + assertThat(registration.getTypeToIdSize()).isEqualTo(2); + } +} diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/internal/pdx/jsonStrings/testJSON.txt b/geode-core/src/integrationTest/resources/org/apache/geode/internal/pdx/jsonStrings/testJSON.txt new file mode 100644 index 0000000..735fa70 --- /dev/null +++ b/geode-core/src/integrationTest/resources/org/apache/geode/internal/pdx/jsonStrings/testJSON.txt @@ -0,0 +1,43 @@ +{ + "configGlossary:installationAt": "Philadelphia, PA", + "configGlossary:adminEmail": "[email protected]", + "configGlossary:poweredBy": "Cofax", + "configGlossary:poweredByIcon": "/images/cofax.gif", + "configGlossary:staticPath": "/content/static", + "templateProcessorClass": "org.cofax.WysiwygTemplate", + "templateLoaderClass": "org.cofax.FilesTemplateLoader", + "templatePath": "templates", + "templateOverridePath": "", + "defaultListTemplate": "listTemplate.htm", + "defaultFileTemplate": "articleTemplate.htm", + "useJSP": false, + "jspListTemplate": "listTemplate.jsp", + "jspFileTemplate": "articleTemplate.jsp", + "cachePackageTagsTrack": 200, + "cachePackageTagsStore": 200, + "cachePackageTagsRefresh": 60, + "cacheTemplatesTrack": 100, + "cacheTemplatesStore": 50, + "cacheTemplatesRefresh": 15, + "cachePagesTrack": 200, + "cachePagesStore": 100, + "cachePagesRefresh": 10, + "cachePagesDirtyRead": 10, + "searchEngineListTemplate": "forSearchEnginesList.htm", + "searchEngineFileTemplate": "forSearchEngines.htm", + "searchEngineRobotsDb": "WEB-INF/robots.db", + "useDataStore": true, + "dataStoreClass": "org.cofax.SqlDataStore", + "redirectionClass": "org.cofax.SqlRedirection", + "dataStoreName": "cofax", + "dataStoreDriver": "com.microsoft.jdbc.sqlserver.SQLServerDriver", + "dataStoreUrl": "jdbc:microsoft:sqlserver://LOCALHOST:1433;DatabaseName=goon", + "dataStoreUser": "sa", + "dataStorePassword": "dataStoreTestQuery", + "dataStoreTestQuery": "SET NOCOUNT ON;select test='test';", + "dataStoreLogFile": "/usr/local/tomcat/logs/datastore.log", + "dataStoreInitConns": 10, + "dataStoreMaxConns": 100, + "dataStoreConnUsageLimit": 100, + "dataStoreLogLevel": "debug", + "maxUrlLength": 500} diff --git a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java index 047f434..aa1d471 100644 --- a/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java +++ b/geode-core/src/main/java/org/apache/geode/pdx/internal/PeerTypeRegistration.java @@ -90,14 +90,7 @@ public class PeerTypeRegistration implements TypeRegistration { */ private Region<Object/* Integer or EnumCode */, Object/* PdxType or enum info */> idToType; - /** - * This map serves two purposes. It lets us look up an id based on a type, if we previously found - * that type in the region. And, if a type is present in this map, that means we read the type - * while holding the dlock, which means the type was distributed to all members. - */ - private final Map<PdxType, Integer> typeToId = Collections.synchronizedMap(new HashMap<>()); - - private final Map<EnumInfo, EnumId> enumToId = Collections.synchronizedMap(new HashMap<>()); + private LocalReverseMap localReverseMap = new LocalReverseMap(); private final Map<String, CopyOnWriteHashSet<PdxType>> classToType = new CopyOnWriteHashMap<>(); @@ -170,8 +163,9 @@ public class PeerTypeRegistration implements TypeRegistration { verifyConfiguration(); // update a local map with the pdxtypes registered Object value = event.getNewValue(); - if (value instanceof PdxType) { - updateClassToTypeMap((PdxType) value); + Object key = event.getKey(); + if (value != null) { + updateLocalMaps(key, value); } } }); @@ -221,6 +215,8 @@ public class PeerTypeRegistration implements TypeRegistration { if (!getIdToType().isEmpty()) { verifyConfiguration(); } + + buildTypeToIdFromIdToType(); } protected DistributedLockService getLockService() { @@ -359,26 +355,30 @@ public class PeerTypeRegistration implements TypeRegistration { public int defineType(PdxType newType) { statistics.typeDefined(); verifyConfiguration(); - Integer existingId = typeToId.get(newType); + Integer existingId = localReverseMap.checkIfExistsInLocal(newType); if (existingId != null) { return existingId; } lock(); try { - int id = getExistingIdForType(newType); - if (id != -1) { - return id; + if (localReverseMap.shouldReloadFromRegion()) { + buildTypeToIdFromIdToType(); + } + localReverseMap.flushLocalMap(); + // double check if my type is in region in case the typeToId map has been updated while + // waiting to obtain a lock + existingId = localReverseMap.checkIfExistsInLocal(newType); + if (existingId != null) { + return existingId; } - id = allocateTypeId(newType); + int id = allocateTypeId(newType); newType.setTypeId(id); updateIdToTypeRegion(newType); - - typeToId.put(newType, id); - return newType.getTypeId(); } finally { + localReverseMap.flushLocalMap(); unlock(); } } @@ -543,39 +543,28 @@ public class PeerTypeRegistration implements TypeRegistration { } } - /** Should be called holding the dlock */ - private int getExistingIdForType(PdxType newType) { + private void buildTypeToIdFromIdToType() { int totalPdxTypeIdInDS = 0; TXStateProxy currentState = suspendTX(); try { - int result = -1; for (Map.Entry<Object, Object> entry : getIdToType().entrySet()) { Object v = entry.getValue(); Object k = entry.getKey(); - if (k instanceof EnumId) { - EnumId id = (EnumId) k; - EnumInfo info = (EnumInfo) v; - enumToId.put(info, id); - } else { + if (v instanceof PdxType) { PdxType foundType = (PdxType) v; Integer id = (Integer) k; int tmpDsId = PLACE_HOLDER_FOR_DS_ID & id; if (tmpDsId == typeIdPrefix) { totalPdxTypeIdInDS++; - } - - typeToId.put(foundType, id); - if (foundType.equals(newType)) { - result = foundType.getTypeId(); + if (totalPdxTypeIdInDS >= MAX_TYPE_ID) { + throw new InternalGemFireError( + "Used up all of the PDX type ids for this distributed system. The maximum number of PDX types is " + + MAX_TYPE_ID); + } } } + localReverseMap.save(k, v, false); } - if (totalPdxTypeIdInDS == MAX_TYPE_ID) { - throw new InternalGemFireError( - "Used up all of the PDX type ids for this distributed system. The maximum number of PDX types is " - + MAX_TYPE_ID); - } - return result; } finally { resumeTX(currentState); } @@ -593,7 +582,6 @@ public class PeerTypeRegistration implements TypeRegistration { if (k instanceof EnumId) { EnumId id = (EnumId) k; EnumInfo info = (EnumInfo) v; - enumToId.put(info, id); int tmpDsId = PLACE_HOLDER_FOR_DS_ID & id.intValue(); if (tmpDsId == typeIdPrefix) { totalEnumIdInDS++; @@ -601,9 +589,8 @@ public class PeerTypeRegistration implements TypeRegistration { if (ei.equals(info)) { result = id; } - } else { - typeToId.put((PdxType) v, (Integer) k); } + localReverseMap.save(k, v, false); } if (totalEnumIdInDS == MAX_TYPE_ID) { @@ -663,12 +650,13 @@ public class PeerTypeRegistration implements TypeRegistration { public int defineEnum(final EnumInfo newInfo) { statistics.enumDefined(); verifyConfiguration(); - final EnumId existingId = enumToId.get(newInfo); + final EnumId existingId = localReverseMap.checkIfExistsInLocal(newInfo); if (existingId != null) { return existingId.intValue(); } lock(); try { + localReverseMap.flushLocalMap(); EnumId id = getExistingIdForEnum(newInfo); if (id != null) { return id.intValue(); @@ -678,10 +666,11 @@ public class PeerTypeRegistration implements TypeRegistration { updateIdToEnumRegion(id, newInfo); - enumToId.put(newInfo, id); + localReverseMap.save(id, newInfo, false); return id.intValue(); } finally { + localReverseMap.flushLocalMap(); unlock(); } } @@ -721,8 +710,10 @@ public class PeerTypeRegistration implements TypeRegistration { /** * adds a PdxType for a field to a {@code className => Set<PdxType>} map */ - private void updateClassToTypeMap(PdxType type) { - if (type != null) { + private void updateLocalMaps(Object key, Object value) { + localReverseMap.save(key, value, true); + if (value instanceof PdxType) { + PdxType type = (PdxType) value; synchronized (classToType) { if (type.getClassName().equals(JSONFormatter.JSON_CLASSNAME)) { return; // no need to include here @@ -788,4 +779,91 @@ public class PeerTypeRegistration implements TypeRegistration { public int getLocalSize() { return getIdToType().size(); } + + @VisibleForTesting + public int getTypeToIdSize() { + return localReverseMap.typeToIdSize(); + } + + @VisibleForTesting + public int getEnumToIdSize() { + return localReverseMap.enumToIdSize(); + } + + class LocalReverseMap { + /** + * When a new pdxType or a new enumInfo is added to idToType region, its + * listener will add the new type to the pendingTypeToId first, to make sure + * the distribution finished. + * Then any member who wants to use this new pdxType has to get the dlock to + * flush the pendingTypeToId map into typeToId. This design to guarantee that + * when using the new pdxType, it should have been distributed to all members. + */ + private final Map<PdxType, Integer> pendingTypeToId = + Collections.synchronizedMap(new HashMap<>()); + private final Map<EnumInfo, EnumId> pendingEnumToId = + Collections.synchronizedMap(new HashMap<>()); + + /** + * This map serves two purposes. It lets us look up an id based on a type, if we previously + * found + * that type in the region. And, if a type is present in this map, that means we read the type + * while holding the dlock, which means the type was distributed to all members. + */ + private final Map<PdxType, Integer> typeToId = Collections.synchronizedMap(new HashMap<>()); + + private final Map<EnumInfo, EnumId> enumToId = Collections.synchronizedMap(new HashMap<>()); + + void save(Object key, Object value, boolean isPending) { + if (value instanceof PdxType) { + PdxType type = (PdxType) value; + if (isPending) { + pendingTypeToId.put(type, (Integer) key); + } else { + typeToId.put(type, (Integer) key); + } + } else if (value instanceof EnumInfo) { + EnumInfo info = (EnumInfo) value; + if (isPending) { + pendingEnumToId.put(info, (EnumId) key); + } else { + enumToId.put(info, (EnumId) key); + } + } + } + + int typeToIdSize() { + return typeToId.size(); + } + + int enumToIdSize() { + return enumToId.size(); + } + + Integer checkIfExistsInLocal(PdxType newType) { + return typeToId.get(newType); + } + + EnumId checkIfExistsInLocal(EnumInfo newInfo) { + return enumToId.get(newInfo); + } + + // The local maps should be loaded from the region if there is a mismatch in size between the + // region and all local maps + boolean shouldReloadFromRegion() { + return ((typeToId.size() + pendingTypeToId.size() + enumToId.size() + + pendingEnumToId.size()) != getIdToType().size()); + } + + void flushLocalMap() { + if (!pendingTypeToId.isEmpty()) { + typeToId.putAll(pendingTypeToId); + pendingTypeToId.clear(); + } + if (!pendingEnumToId.isEmpty()) { + enumToId.putAll(pendingEnumToId); + pendingEnumToId.clear(); + } + } + } } diff --git a/geode-core/src/test/java/org/apache/geode/pdx/internal/PeerTypeRegistrationTest.java b/geode-core/src/test/java/org/apache/geode/pdx/internal/PeerTypeRegistrationTest.java index 69991de..8244f19 100644 --- a/geode-core/src/test/java/org/apache/geode/pdx/internal/PeerTypeRegistrationTest.java +++ b/geode-core/src/test/java/org/apache/geode/pdx/internal/PeerTypeRegistrationTest.java @@ -32,6 +32,7 @@ import org.apache.geode.cache.Region; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.TXManagerImpl; import org.apache.geode.internal.statistics.StatisticsManager; import org.apache.geode.pdx.PdxInitializationException; @@ -57,6 +58,9 @@ public class PeerTypeRegistrationTest { when(region.size()).thenReturn(1); when(internalCache.createVMRegion(eq(PeerTypeRegistration.REGION_NAME), any(), any())) .thenReturn(region); + when(region.getRegionService()).thenReturn(internalCache); + final TXManagerImpl txManager = mock(TXManagerImpl.class); + when(internalCache.getCacheTransactionManager()).thenReturn(txManager); peerTypeRegistration = new PeerTypeRegistration(internalCache); }
