SLIDER-600: reread map information from history; currently no use is made of it. History reader also now ignores any extra records in the history file, to make it resilient to change, especially version-rollback
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/b2f060b0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/b2f060b0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/b2f060b0 Branch: refs/heads/develop Commit: b2f060b083f048f6e7ab191701425899bace14c1 Parents: f10bef8 Author: Steve Loughran <[email protected]> Authored: Thu Apr 9 13:15:17 2015 +0100 Committer: Steve Loughran <[email protected]> Committed: Thu Apr 9 13:15:17 2015 +0100 ---------------------------------------------------------------------- .../slider/server/avro/RoleHistoryRecord.avsc | 4 +- .../server/appmaster/state/RoleHistory.java | 12 ++++ .../slider/server/avro/LoadedRoleHistory.java | 18 +++++ .../slider/server/avro/RoleHistoryWriter.java | 72 +++++++++++++++----- 4 files changed, 87 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2f060b0/slider-core/src/main/avro/org/apache/slider/server/avro/RoleHistoryRecord.avsc ---------------------------------------------------------------------- diff --git a/slider-core/src/main/avro/org/apache/slider/server/avro/RoleHistoryRecord.avsc b/slider-core/src/main/avro/org/apache/slider/server/avro/RoleHistoryRecord.avsc index 778a728..3667c01 100644 --- a/slider-core/src/main/avro/org/apache/slider/server/avro/RoleHistoryRecord.avsc +++ b/slider-core/src/main/avro/org/apache/slider/server/avro/RoleHistoryRecord.avsc @@ -77,8 +77,8 @@ "name": "rolemap", "type": { "type": "map", - "values": "long" - } + "values": "int" + } } ] }, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2f060b0/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index e95e2a6..bb482af 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -895,6 +895,18 @@ public class RoleHistory { } /** + * Build the mapping entry for persisting to the role history + * @return a mapping object + */ + public synchronized Map<CharSequence, Integer> buildMappingForHistoryFile() { + Map<CharSequence, Integer> mapping = new HashMap<>(getRoleSize()); + for (ProviderRole role : providerRoles) { + mapping.put(role.name, role.id); + } + return mapping; + } + + /** * Get a clone of the available list * @param role role index * @return a clone of the list http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2f060b0/slider-core/src/main/java/org/apache/slider/server/avro/LoadedRoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/LoadedRoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/avro/LoadedRoleHistory.java index 866e716..3947064 100644 --- a/slider-core/src/main/java/org/apache/slider/server/avro/LoadedRoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/avro/LoadedRoleHistory.java @@ -19,6 +19,7 @@ package org.apache.slider.server.avro; import org.apache.hadoop.fs.Path; +import org.apache.slider.common.tools.SliderUtils; import java.util.ArrayList; import java.util.HashMap; @@ -38,10 +39,18 @@ public class LoadedRoleHistory { public final List<NodeEntryRecord> records = new ArrayList<>(); + /** + * Add a record + * @param record + */ public void add(NodeEntryRecord record) { records.add(record); } + /** + * Number of loaded records + * @return + */ public int size() { return records.size(); } @@ -60,4 +69,13 @@ public class LoadedRoleHistory { public void setPath(Path path) { this.path = path; } + + public void buildMapping(Map<CharSequence, Integer> source) { + roleMap.clear(); + for (Map.Entry<CharSequence, Integer> entry : source.entrySet()) { + roleMap.put(SliderUtils.sequenceToString(entry.getKey()), + entry.getValue()); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/b2f060b0/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java b/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java index b638ae9..64929b4 100644 --- a/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java +++ b/slider-core/src/main/java/org/apache/slider/server/avro/RoleHistoryWriter.java @@ -39,7 +39,6 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.slider.common.SliderKeys; import org.apache.slider.common.tools.SliderUtils; import org.apache.slider.core.exceptions.BadConfigException; -import org.apache.slider.providers.ProviderRole; import org.apache.slider.server.appmaster.state.NodeEntry; import org.apache.slider.server.appmaster.state.NodeInstance; import org.apache.slider.server.appmaster.state.RoleHistory; @@ -57,6 +56,7 @@ import java.util.Collections; import java.util.List; import java.util.ListIterator; import java.util.Locale; +import java.util.Map; /** * Write out the role history to an output stream. @@ -89,17 +89,13 @@ public class RoleHistoryWriter { DatumWriter<RoleHistoryRecord> writer = new SpecificDatumWriter<>(RoleHistoryRecord.class); + RoleHistoryRecord record = createHeaderRecord(savetime, history); int roles = history.getRoleSize(); - RoleHistoryHeader header = new RoleHistoryHeader(); - header.setVersion(ROLE_HISTORY_VERSION); - header.setSaved(savetime); - header.setSavedx(Long.toHexString(savetime)); - header.setSavedate(SliderUtils.toGMTString(savetime)); - header.setRoles(roles); - RoleHistoryRecord record = new RoleHistoryRecord(header); Schema schema = record.getSchema(); Encoder encoder = EncoderFactory.get().jsonEncoder(schema, out); writer.write(record, encoder); + // now write the rolemap record + writer.write(createRolemapRecord(history), encoder); long count = 0; //now for every role history entry, write out its record Collection<NodeInstance> instances = history.cloneNodemap().values(); @@ -127,6 +123,34 @@ public class RoleHistoryWriter { } /** + * Create the header record + * @param savetime time of save + * @param history history + * @return a record to place at the head of the file + */ + private RoleHistoryRecord createHeaderRecord(long savetime, RoleHistory history) { + RoleHistoryHeader header = new RoleHistoryHeader(); + header.setVersion(ROLE_HISTORY_VERSION); + header.setSaved(savetime); + header.setSavedx(Long.toHexString(savetime)); + header.setSavedate(SliderUtils.toGMTString(savetime)); + header.setRoles(history.getRoleSize()); + return new RoleHistoryRecord(header); + } + + /** + * Create the role record + * @param history history + * @return a record to place at the head of the file + */ + private RoleHistoryRecord createRolemapRecord(RoleHistory history) { + RoleHistoryMapping entry = new RoleHistoryMapping(); + Map<CharSequence, Integer> mapping = history.buildMappingForHistoryFile(); + entry.setRolemap(mapping); + return new RoleHistoryRecord(entry); + } + + /** * Write write the file * * @@ -210,25 +234,38 @@ public class RoleHistoryWriter { ROLE_HISTORY_VERSION)); } loadedRoleHistory.setHeader(header); - RoleHistoryFooter footer; + RoleHistoryFooter footer = null; int records = 0; //go through reading data try { - while (true) { + while (footer == null) { record = reader.read(null, decoder); entry = record.getEntry(); if (entry instanceof RoleHistoryHeader) { throw new IOException("Duplicate Role History Header found"); - } - if (entry instanceof RoleHistoryFooter) { + } else if (entry instanceof RoleHistoryMapping) { + // role history mapping entry + if (!loadedRoleHistory.roleMap.isEmpty()) { + // duplicate role maps are viewed as something to warn over, rather than fail + log.warn("Duplicate role map; ignoring"); + } else { + RoleHistoryMapping historyMapping = (RoleHistoryMapping) entry; + loadedRoleHistory.buildMapping(historyMapping.getRolemap()); + } + } else if (entry instanceof NodeEntryRecord) { + // normal record + records++; + NodeEntryRecord nodeEntryRecord = (NodeEntryRecord) entry; + loadedRoleHistory.add(nodeEntryRecord); + } else if (entry instanceof RoleHistoryFooter) { //tail end of the file footer = (RoleHistoryFooter) entry; - break; + } else { + // this is to handle future versions, such as when rolling back + // from a later version of slider + log.warn("Discarding unknown record {}", entry); } - records++; - NodeEntryRecord nodeEntryRecord = (NodeEntryRecord) entry; - loadedRoleHistory.add(nodeEntryRecord); } } catch (EOFException e) { EOFException ex = new EOFException( @@ -236,7 +273,8 @@ public class RoleHistoryWriter { ex.initCause(e); throw ex; } - //at this point there should be no data left. + // at this point there should be no data left. + // check by reading and expecting a -1 if (in.read() > 0) { // footer is in stream before the last record throw new EOFException(
